You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/30 19:11:38 UTC
[3/6] beam git commit: Adds EmptyMatchTreatment to AvroIO
Adds EmptyMatchTreatment to AvroIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82b08523
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82b08523
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82b08523
Branch: refs/heads/master
Commit: 82b08523084aa6f20ea3c4d5b8b89cdbe0378060
Parents: 84eb7f3
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 14:40:52 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 161 ++++++++++++++-----
.../java/org/apache/beam/sdk/io/AvroSource.java | 22 ++-
.../apache/beam/sdk/io/BlockBasedSource.java | 27 +++-
.../org/apache/beam/sdk/io/FileBasedSource.java | 4 +
4 files changed, 171 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d4a7cbb..9601a7d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -69,6 +70,12 @@ import org.apache.beam.sdk.values.TypeDescriptors;
*
* <p>See {@link FileSystems} for information on supported file systems and filepatterns.
*
+ * <h3>Filepattern expansion and watching</h3>
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link #readAll}
+ * allows them in case the filepattern contains a glob wildcard character. Use {@link
+ * Read#withEmptyMatchTreatment} to configure this behavior.
+ *
* <h3>Reading records of a known schema</h3>
*
* <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
@@ -239,6 +246,7 @@ public class AvroIO {
*/
public static <T> Read<T> read(Class<T> recordClass) {
return new AutoValue_AvroIO_Read.Builder<T>()
+ .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
.setRecordClass(recordClass)
.setSchema(ReflectData.get().getSchema(recordClass))
.setHintMatchesManyFiles(false)
@@ -248,6 +256,7 @@ public class AvroIO {
/** Like {@link #read}, but reads each filepattern in the input {@link PCollection}. */
public static <T> ReadAll<T> readAll(Class<T> recordClass) {
return new AutoValue_AvroIO_ReadAll.Builder<T>()
+ .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
.setRecordClass(recordClass)
.setSchema(ReflectData.get().getSchema(recordClass))
// 64MB is a reasonable value that allows to amortize the cost of opening files,
@@ -260,6 +269,7 @@ public class AvroIO {
/** Reads Avro file(s) containing records of the specified schema. */
public static Read<GenericRecord> readGenericRecords(Schema schema) {
return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
+ .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
.setRecordClass(GenericRecord.class)
.setSchema(schema)
.setHintMatchesManyFiles(false)
@@ -272,6 +282,7 @@ public class AvroIO {
*/
public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
+ .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
.setRecordClass(GenericRecord.class)
.setSchema(schema)
.setDesiredBundleSizeBytes(64 * 1024 * 1024L)
@@ -300,6 +311,7 @@ public class AvroIO {
*/
public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
return new AutoValue_AvroIO_Parse.Builder<T>()
+ .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
.setParseFn(parseFn)
.setHintMatchesManyFiles(false)
.build();
@@ -312,6 +324,7 @@ public class AvroIO {
public static <T> ParseAll<T> parseAllGenericRecords(
SerializableFunction<GenericRecord, T> parseFn) {
return new AutoValue_AvroIO_ParseAll.Builder<T>()
+ .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
.setParseFn(parseFn)
.setDesiredBundleSizeBytes(64 * 1024 * 1024L)
.build();
@@ -392,6 +405,7 @@ public class AvroIO {
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@Nullable abstract ValueProvider<String> getFilepattern();
+ abstract EmptyMatchTreatment getEmptyMatchTreatment();
@Nullable abstract Class<T> getRecordClass();
@Nullable abstract Schema getSchema();
abstract boolean getHintMatchesManyFiles();
@@ -401,6 +415,7 @@ public class AvroIO {
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+ abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
abstract Builder<T> setRecordClass(Class<T> recordClass);
abstract Builder<T> setSchema(Schema schema);
abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
@@ -424,6 +439,13 @@ public class AvroIO {
}
/**
+ * Configures whether or not a filepattern matching no files is allowed.
+ */
+ public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return toBuilder().setEmptyMatchTreatment(treatment).build();
+ }
+
+ /**
* Hints that the filepattern specified in {@link #from(String)} matches a very large number of
* files.
*
@@ -440,37 +462,48 @@ public class AvroIO {
public PCollection<T> expand(PBegin input) {
checkNotNull(getFilepattern(), "filepattern");
checkNotNull(getSchema(), "schema");
- if (getHintMatchesManyFiles()) {
- ReadAll<T> readAll =
- (getRecordClass() == GenericRecord.class)
- ? (ReadAll<T>) readAllGenericRecords(getSchema())
- : readAll(getRecordClass());
- return input
- .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
- .apply(readAll);
- } else {
- return input
- .getPipeline()
- .apply(
- "Read",
- org.apache.beam.sdk.io.Read.from(
- createSource(getFilepattern(), getRecordClass(), getSchema())));
+
+ if (!getHintMatchesManyFiles()) {
+ return input.apply(
+ "Read",
+ org.apache.beam.sdk.io.Read.from(
+ createSource(
+ getFilepattern(), getEmptyMatchTreatment(), getRecordClass(), getSchema())));
}
+ // All other cases go through ReadAll.
+
+ ReadAll<T> readAll =
+ (getRecordClass() == GenericRecord.class)
+ ? (ReadAll<T>) readAllGenericRecords(getSchema())
+ : readAll(getRecordClass());
+ readAll = readAll.withEmptyMatchTreatment(getEmptyMatchTreatment());
+ return input
+ .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+ .apply("Via ReadAll", readAll);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.addIfNotNull(
- DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"));
+ builder
+ .addIfNotNull(
+ DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+ .add(
+ DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+ .withLabel("Treatment of filepatterns that match no files"));
}
@SuppressWarnings("unchecked")
private static <T> AvroSource<T> createSource(
- ValueProvider<String> filepattern, Class<T> recordClass, Schema schema) {
+ ValueProvider<String> filepattern,
+ EmptyMatchTreatment emptyMatchTreatment,
+ Class<T> recordClass,
+ Schema schema) {
+ AvroSource<?> source =
+ AvroSource.from(filepattern).withEmptyMatchTreatment(emptyMatchTreatment);
return recordClass == GenericRecord.class
- ? (AvroSource<T>) AvroSource.from(filepattern).withSchema(schema)
- : AvroSource.from(filepattern).withSchema(recordClass);
+ ? (AvroSource<T>) source.withSchema(schema)
+ : source.withSchema(recordClass);
}
}
@@ -479,6 +512,7 @@ public class AvroIO {
/** Implementation of {@link #readAll}. */
@AutoValue
public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+ abstract EmptyMatchTreatment getEmptyMatchTreatment();
@Nullable abstract Class<T> getRecordClass();
@Nullable abstract Schema getSchema();
abstract long getDesiredBundleSizeBytes();
@@ -487,6 +521,7 @@ public class AvroIO {
@AutoValue.Builder
abstract static class Builder<T> {
+ abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
abstract Builder<T> setRecordClass(Class<T> recordClass);
abstract Builder<T> setSchema(Schema schema);
abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -494,6 +529,11 @@ public class AvroIO {
abstract ReadAll<T> build();
}
+ /** Like {@link Read#withEmptyMatchTreatment}. */
+ public ReadAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return toBuilder().setEmptyMatchTreatment(treatment).build();
+ }
+
@VisibleForTesting
ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
@@ -502,24 +542,40 @@ public class AvroIO {
@Override
public PCollection<T> expand(PCollection<String> input) {
checkNotNull(getSchema(), "schema");
+ Match.Filepatterns matchFilepatterns =
+ Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment());
+
return input
- .apply(Match.filepatterns())
+ .apply(matchFilepatterns)
.apply(
"Read all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */,
getDesiredBundleSizeBytes(),
- new CreateSourceFn<>(getRecordClass(), getSchema().toString())))
+ new CreateSourceFn<>(
+ getEmptyMatchTreatment(), getRecordClass(), getSchema().toString())))
.setCoder(AvroCoder.of(getRecordClass(), getSchema()));
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .add(
+ DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+ .withLabel("Treatment of filepatterns that match no files"));
+ }
}
private static class CreateSourceFn<T>
implements SerializableFunction<String, FileBasedSource<T>> {
+ private final EmptyMatchTreatment emptyMatchTreatment;
private final Class<T> recordClass;
private final Supplier<Schema> schemaSupplier;
- public CreateSourceFn(Class<T> recordClass, String jsonSchema) {
+ public CreateSourceFn(
+ EmptyMatchTreatment emptyMatchTreatment, Class<T> recordClass, String jsonSchema) {
+ this.emptyMatchTreatment = emptyMatchTreatment;
this.recordClass = recordClass;
this.schemaSupplier = AvroUtils.serializableSchemaSupplier(jsonSchema);
}
@@ -527,7 +583,7 @@ public class AvroIO {
@Override
public FileBasedSource<T> apply(String input) {
return Read.createSource(
- StaticValueProvider.of(input), recordClass, schemaSupplier.get());
+ StaticValueProvider.of(input), emptyMatchTreatment, recordClass, schemaSupplier.get());
}
}
@@ -537,6 +593,7 @@ public class AvroIO {
@AutoValue
public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
@Nullable abstract ValueProvider<String> getFilepattern();
+ abstract EmptyMatchTreatment getEmptyMatchTreatment();
abstract SerializableFunction<GenericRecord, T> getParseFn();
@Nullable abstract Coder<T> getCoder();
abstract boolean getHintMatchesManyFiles();
@@ -546,6 +603,7 @@ public class AvroIO {
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+ abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
abstract Builder<T> setCoder(Coder<T> coder);
abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles);
@@ -563,6 +621,11 @@ public class AvroIO {
return toBuilder().setFilepattern(filepattern).build();
}
+ /** Like {@link Read#withEmptyMatchTreatment}. */
+ public Parse<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return toBuilder().setEmptyMatchTreatment(treatment).build();
+ }
+
/** Sets a coder for the result of the parse function. */
public Parse<T> withCoder(Coder<T> coder) {
return toBuilder().setCoder(coder).build();
@@ -577,14 +640,20 @@ public class AvroIO {
public PCollection<T> expand(PBegin input) {
checkNotNull(getFilepattern(), "filepattern");
Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
- if (getHintMatchesManyFiles()) {
- return input
- .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
- .apply(parseAllGenericRecords(getParseFn()).withCoder(getCoder()));
+
+ if (!getHintMatchesManyFiles()) {
+ return input.apply(
+ org.apache.beam.sdk.io.Read.from(
+ AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
}
- return input.apply(
- org.apache.beam.sdk.io.Read.from(
- AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
+ // All other cases go through ParseAllGenericRecords.
+ ParseAll<T> parseAll =
+ parseAllGenericRecords(getParseFn())
+ .withCoder(coder)
+ .withEmptyMatchTreatment(getEmptyMatchTreatment());
+ return input
+ .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+ .apply("Via ParseAll", parseAll);
}
private static <T> Coder<T> inferCoder(
@@ -612,7 +681,10 @@ public class AvroIO {
builder
.addIfNotNull(
DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
- .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+ .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
+ .add(
+ DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+ .withLabel("Treatment of filepatterns that match no files"));
}
}
@@ -621,6 +693,7 @@ public class AvroIO {
/** Implementation of {@link #parseAllGenericRecords}. */
@AutoValue
public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+ abstract EmptyMatchTreatment getEmptyMatchTreatment();
abstract SerializableFunction<GenericRecord, T> getParseFn();
@Nullable abstract Coder<T> getCoder();
abstract long getDesiredBundleSizeBytes();
@@ -629,6 +702,7 @@ public class AvroIO {
@AutoValue.Builder
abstract static class Builder<T> {
+ abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
abstract Builder<T> setCoder(Coder<T> coder);
abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -636,6 +710,11 @@ public class AvroIO {
abstract ParseAll<T> build();
}
+ /** Like {@link Read#withEmptyMatchTreatment}. */
+ public ParseAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return toBuilder().setEmptyMatchTreatment(treatment).build();
+ }
+
/** Specifies the coder for the result of the {@code parseFn}. */
public ParseAll<T> withCoder(Coder<T> coder) {
return toBuilder().setCoder(coder).build();
@@ -650,15 +729,21 @@ public class AvroIO {
public PCollection<T> expand(PCollection<String> input) {
final Coder<T> coder =
Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
- SerializableFunction<String, FileBasedSource<T>> createSource =
+ final SerializableFunction<GenericRecord, T> parseFn = getParseFn();
+ final EmptyMatchTreatment emptyMatchTreatment = getEmptyMatchTreatment();
+ final SerializableFunction<String, FileBasedSource<T>> createSource =
new SerializableFunction<String, FileBasedSource<T>>() {
@Override
public FileBasedSource<T> apply(String input) {
- return AvroSource.from(input).withParseFn(getParseFn(), coder);
+ return AvroSource.from(input)
+ .withParseFn(parseFn, coder)
+ .withEmptyMatchTreatment(emptyMatchTreatment);
}
};
+ Match.Filepatterns matchFilepatterns =
+ Match.filepatterns().withEmptyMatchTreatment(emptyMatchTreatment);
return input
- .apply(Match.filepatterns())
+ .apply(matchFilepatterns)
.apply(
"Parse all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
@@ -671,7 +756,11 @@ public class AvroIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+ builder
+ .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
+ .add(
+ DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+ .withLabel("Treatment of filepatterns that match no files"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 8dd3125..2600d76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -211,6 +212,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
return new AvroSource<>(
fileNameOrPattern,
+ EmptyMatchTreatment.DISALLOW,
DEFAULT_MIN_BUNDLE_SIZE,
readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
}
@@ -220,11 +222,20 @@ public class AvroSource<T> extends BlockBasedSource<T> {
return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
}
+ public AvroSource<T> withEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment) {
+ return new AvroSource<T>(
+ getFileOrPatternSpecProvider(),
+ emptyMatchTreatment,
+ getMinBundleSize(),
+ mode);
+ }
+
/** Reads files containing records that conform to the given schema. */
public AvroSource<GenericRecord> withSchema(String schema) {
checkNotNull(schema, "schema");
return new AvroSource<>(
getFileOrPatternSpecProvider(),
+ getEmptyMatchTreatment(),
getMinBundleSize(),
readGenericRecordsWithSchema(schema));
}
@@ -240,6 +251,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
checkNotNull(clazz, "clazz");
return new AvroSource<>(
getFileOrPatternSpecProvider(),
+ getEmptyMatchTreatment(),
getMinBundleSize(),
readGeneratedClasses(clazz));
}
@@ -254,6 +266,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
checkNotNull(parseFn, "coder");
return new AvroSource<>(
getFileOrPatternSpecProvider(),
+ getEmptyMatchTreatment(),
getMinBundleSize(),
parseGenericRecords(parseFn, coder));
}
@@ -263,15 +276,17 @@ public class AvroSource<T> extends BlockBasedSource<T> {
* minBundleSize} and its use.
*/
public AvroSource<T> withMinBundleSize(long minBundleSize) {
- return new AvroSource<>(getFileOrPatternSpecProvider(), minBundleSize, mode);
+ return new AvroSource<>(
+ getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), minBundleSize, mode);
}
/** Constructor for FILEPATTERN mode. */
private AvroSource(
ValueProvider<String> fileNameOrPattern,
+ EmptyMatchTreatment emptyMatchTreatment,
long minBundleSize,
Mode<T> mode) {
- super(fileNameOrPattern, minBundleSize);
+ super(fileNameOrPattern, emptyMatchTreatment, minBundleSize);
this.mode = mode;
}
@@ -463,7 +478,8 @@ public class AvroSource<T> extends BlockBasedSource<T> {
return new AvroSource<>(
getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), mode);
case FILEPATTERN:
- return new AvroSource<>(getFileOrPatternSpecProvider(), getMinBundleSize(), mode);
+ return new AvroSource<>(
+ getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), getMinBundleSize(), mode);
default:
throw new InvalidObjectException(
String.format("Unknown mode %s for AvroSource %s", getMode(), this));
http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
index 25e8483..ec4f4ad 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
@@ -63,18 +64,36 @@ import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
/**
* Creates a {@code BlockBasedSource} based on a file name or pattern. Subclasses must call this
- * constructor when creating a {@code BlockBasedSource} for a file pattern. See
- * {@link FileBasedSource} for more information.
+ * constructor when creating a {@code BlockBasedSource} for a file pattern. See {@link
+ * FileBasedSource} for more information.
+ */
+ public BlockBasedSource(
+ String fileOrPatternSpec, EmptyMatchTreatment emptyMatchTreatment, long minBundleSize) {
+ this(StaticValueProvider.of(fileOrPatternSpec), emptyMatchTreatment, minBundleSize);
+ }
+
+ /**
+ * Like {@link #BlockBasedSource(String, EmptyMatchTreatment, long)} but with a default {@link
+ * EmptyMatchTreatment} of {@link EmptyMatchTreatment#DISALLOW}.
*/
public BlockBasedSource(String fileOrPatternSpec, long minBundleSize) {
- super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
+ this(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
}
/** Like {@link #BlockBasedSource(String, long)}. */
public BlockBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
- super(fileOrPatternSpec, minBundleSize);
+ this(fileOrPatternSpec, EmptyMatchTreatment.DISALLOW, minBundleSize);
}
+ /** Like {@link #BlockBasedSource(String, EmptyMatchTreatment, long)}. */
+ public BlockBasedSource(
+ ValueProvider<String> fileOrPatternSpec,
+ EmptyMatchTreatment emptyMatchTreatment,
+ long minBundleSize) {
+ super(fileOrPatternSpec, emptyMatchTreatment, minBundleSize);
+ }
+
+
/**
* Creates a {@code BlockBasedSource} for a single file. Subclasses must call this constructor
* when implementing {@link BlockBasedSource#createForSubrangeOfFile}. See documentation in
http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index f835fa4..dabda84 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -154,6 +154,10 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
return fileOrPatternSpec;
}
+ public final EmptyMatchTreatment getEmptyMatchTreatment() {
+ return emptyMatchTreatment;
+ }
+
public final Mode getMode() {
return mode;
}