You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/30 19:11:36 UTC
[1/6] beam git commit: Adds AvroIO watchForNewFiles
Repository: beam
Updated Branches:
refs/heads/master d64f2cce8 -> 5c2cab017
Adds AvroIO watchForNewFiles
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1f39871
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1f39871
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1f39871
Branch: refs/heads/master
Commit: f1f39871da3668bb2ffbc1c27449d36c995b645b
Parents: 82b0852
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 14:41:32 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 132 ++++++++++++++++++-
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 94 +++++++++++--
2 files changed, 212 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/f1f39871/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 9601a7d..f6f3308 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.sdk.transforms.Watch.Growth.ignoreInput;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
@@ -48,12 +49,14 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Duration;
/**
* {@link PTransform}s for reading and writing Avro files.
@@ -76,6 +79,9 @@ import org.apache.beam.sdk.values.TypeDescriptors;
* allows them in case the filepattern contains a glob wildcard character. Use {@link
* Read#withEmptyMatchTreatment} to configure this behavior.
*
+ * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles}
+ * allows streaming of new files matching the filepattern(s).
+ *
* <h3>Reading records of a known schema</h3>
*
* <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
@@ -137,6 +143,20 @@ import org.apache.beam.sdk.values.TypeDescriptors;
* filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...);
* }</pre>
*
+ * <h3>Streaming new files matching a filepattern</h3>
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
+ * .read(AvroAutoGenClass.class)
+ * .from("gs://my_bucket/path/to/records-*.avro")
+ * .watchForNewFiles(
+ * // Check for new files every minute
+ * Duration.standardMinutes(1),
+ * // Stop watching the filepattern if no new files appear within an hour
+ * afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
* <h3>Reading a very large number of files</h3>
*
* <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
@@ -406,6 +426,8 @@ public class AvroIO {
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@Nullable abstract ValueProvider<String> getFilepattern();
abstract EmptyMatchTreatment getEmptyMatchTreatment();
+ @Nullable abstract Duration getWatchForNewFilesInterval();
+ @Nullable abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition();
@Nullable abstract Class<T> getRecordClass();
@Nullable abstract Schema getSchema();
abstract boolean getHintMatchesManyFiles();
@@ -416,6 +438,9 @@ public class AvroIO {
abstract static class Builder<T> {
abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
+ abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+ abstract Builder<T> setWatchForNewFilesTerminationCondition(
+ TerminationCondition<?, ?> condition);
abstract Builder<T> setRecordClass(Class<T> recordClass);
abstract Builder<T> setSchema(Schema schema);
abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
@@ -446,6 +471,24 @@ public class AvroIO {
}
/**
+ * Continuously watches for new files matching the filepattern, polling it at the given
+ * interval, until the given termination condition is reached. The returned {@link PCollection}
+ * is unbounded.
+ *
+ * <p>This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}.
+ *
+ * @see TerminationCondition
+ */
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public Read<T> watchForNewFiles(
+ Duration pollInterval, TerminationCondition<?, ?> terminationCondition) {
+ return toBuilder()
+ .setWatchForNewFilesInterval(pollInterval)
+ .setWatchForNewFilesTerminationCondition(terminationCondition)
+ .build();
+ }
+
+ /**
* Hints that the filepattern specified in {@link #from(String)} matches a very large number of
* files.
*
@@ -463,7 +506,7 @@ public class AvroIO {
checkNotNull(getFilepattern(), "filepattern");
checkNotNull(getSchema(), "schema");
- if (!getHintMatchesManyFiles()) {
+ if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) {
return input.apply(
"Read",
org.apache.beam.sdk.io.Read.from(
@@ -477,6 +520,11 @@ public class AvroIO {
? (ReadAll<T>) readAllGenericRecords(getSchema())
: readAll(getRecordClass());
readAll = readAll.withEmptyMatchTreatment(getEmptyMatchTreatment());
+ if (getWatchForNewFilesInterval() != null) {
+ TerminationCondition<String, ?> readAllCondition =
+ ignoreInput(getWatchForNewFilesTerminationCondition());
+ readAll = readAll.watchForNewFiles(getWatchForNewFilesInterval(), readAllCondition);
+ }
return input
.apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
.apply("Via ReadAll", readAll);
@@ -490,7 +538,10 @@ public class AvroIO {
DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
.add(
DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
- .withLabel("Treatment of filepatterns that match no files"));
+ .withLabel("Treatment of filepatterns that match no files"))
+ .addIfNotNull(
+ DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+ .withLabel("Interval to watch for new files"));
}
@SuppressWarnings("unchecked")
@@ -513,6 +564,8 @@ public class AvroIO {
@AutoValue
public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
abstract EmptyMatchTreatment getEmptyMatchTreatment();
+ @Nullable abstract Duration getWatchForNewFilesInterval();
+ @Nullable abstract TerminationCondition<String, ?> getWatchForNewFilesTerminationCondition();
@Nullable abstract Class<T> getRecordClass();
@Nullable abstract Schema getSchema();
abstract long getDesiredBundleSizeBytes();
@@ -522,6 +575,9 @@ public class AvroIO {
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
+ abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+ abstract Builder<T> setWatchForNewFilesTerminationCondition(
+ TerminationCondition<String, ?> condition);
abstract Builder<T> setRecordClass(Class<T> recordClass);
abstract Builder<T> setSchema(Schema schema);
abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -534,6 +590,16 @@ public class AvroIO {
return toBuilder().setEmptyMatchTreatment(treatment).build();
}
+ /** Like {@link Read#watchForNewFiles}. */
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public ReadAll<T> watchForNewFiles(
+ Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+ return toBuilder()
+ .setWatchForNewFilesInterval(pollInterval)
+ .setWatchForNewFilesTerminationCondition(terminationCondition)
+ .build();
+ }
+
@VisibleForTesting
ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
@@ -544,6 +610,11 @@ public class AvroIO {
checkNotNull(getSchema(), "schema");
Match.Filepatterns matchFilepatterns =
Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment());
+ if (getWatchForNewFilesInterval() != null) {
+ matchFilepatterns =
+ matchFilepatterns.continuously(
+ getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+ }
return input
.apply(matchFilepatterns)
@@ -563,7 +634,10 @@ public class AvroIO {
builder
.add(
DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
- .withLabel("Treatment of filepatterns that match no files"));
+ .withLabel("Treatment of filepatterns that match no files"))
+ .addIfNotNull(
+ DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+ .withLabel("Interval to watch for new files"));
}
}
@@ -594,6 +668,8 @@ public class AvroIO {
public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
@Nullable abstract ValueProvider<String> getFilepattern();
abstract EmptyMatchTreatment getEmptyMatchTreatment();
+ @Nullable abstract Duration getWatchForNewFilesInterval();
+ @Nullable abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition();
abstract SerializableFunction<GenericRecord, T> getParseFn();
@Nullable abstract Coder<T> getCoder();
abstract boolean getHintMatchesManyFiles();
@@ -604,6 +680,9 @@ public class AvroIO {
abstract static class Builder<T> {
abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
+ abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+ abstract Builder<T> setWatchForNewFilesTerminationCondition(
+ TerminationCondition<?, ?> condition);
abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
abstract Builder<T> setCoder(Coder<T> coder);
abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles);
@@ -626,6 +705,16 @@ public class AvroIO {
return toBuilder().setEmptyMatchTreatment(treatment).build();
}
+ /** Like {@link Read#watchForNewFiles}. */
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public Parse<T> watchForNewFiles(
+ Duration pollInterval, TerminationCondition<?, ?> terminationCondition) {
+ return toBuilder()
+ .setWatchForNewFilesInterval(pollInterval)
+ .setWatchForNewFilesTerminationCondition(terminationCondition)
+ .build();
+ }
+
/** Sets a coder for the result of the parse function. */
public Parse<T> withCoder(Coder<T> coder) {
return toBuilder().setCoder(coder).build();
@@ -641,7 +730,7 @@ public class AvroIO {
checkNotNull(getFilepattern(), "filepattern");
Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
- if (!getHintMatchesManyFiles()) {
+ if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) {
return input.apply(
org.apache.beam.sdk.io.Read.from(
AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
@@ -651,6 +740,11 @@ public class AvroIO {
parseAllGenericRecords(getParseFn())
.withCoder(coder)
.withEmptyMatchTreatment(getEmptyMatchTreatment());
+ if (getWatchForNewFilesInterval() != null) {
+ TerminationCondition<String, ?> parseAllCondition =
+ ignoreInput(getWatchForNewFilesTerminationCondition());
+ parseAll = parseAll.watchForNewFiles(getWatchForNewFilesInterval(), parseAllCondition);
+ }
return input
.apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
.apply("Via ParseAll", parseAll);
@@ -684,7 +778,10 @@ public class AvroIO {
.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
.add(
DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
- .withLabel("Treatment of filepatterns that match no files"));
+ .withLabel("Treatment of filepatterns that match no files"))
+ .addIfNotNull(
+ DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+ .withLabel("Interval to watch for new files"));
}
}
@@ -694,6 +791,8 @@ public class AvroIO {
@AutoValue
public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
abstract EmptyMatchTreatment getEmptyMatchTreatment();
+ @Nullable abstract Duration getWatchForNewFilesInterval();
+ @Nullable abstract TerminationCondition<String, ?> getWatchForNewFilesTerminationCondition();
abstract SerializableFunction<GenericRecord, T> getParseFn();
@Nullable abstract Coder<T> getCoder();
abstract long getDesiredBundleSizeBytes();
@@ -703,6 +802,9 @@ public class AvroIO {
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
+ abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+ abstract Builder<T> setWatchForNewFilesTerminationCondition(
+ TerminationCondition<String, ?> condition);
abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
abstract Builder<T> setCoder(Coder<T> coder);
abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -715,6 +817,16 @@ public class AvroIO {
return toBuilder().setEmptyMatchTreatment(treatment).build();
}
+ /** Like {@link Read#watchForNewFiles}. */
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public ParseAll<T> watchForNewFiles(
+ Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+ return toBuilder()
+ .setWatchForNewFilesInterval(pollInterval)
+ .setWatchForNewFilesTerminationCondition(terminationCondition)
+ .build();
+ }
+
/** Specifies the coder for the result of the {@code parseFn}. */
public ParseAll<T> withCoder(Coder<T> coder) {
return toBuilder().setCoder(coder).build();
@@ -742,6 +854,11 @@ public class AvroIO {
};
Match.Filepatterns matchFilepatterns =
Match.filepatterns().withEmptyMatchTreatment(emptyMatchTreatment);
+ if (getWatchForNewFilesInterval() != null) {
+ matchFilepatterns =
+ matchFilepatterns.continuously(
+ getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+ }
return input
.apply(matchFilepatterns)
.apply(
@@ -760,7 +877,10 @@ public class AvroIO {
.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
.add(
DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
- .withLabel("Treatment of filepatterns that match no files"));
+ .withLabel("Treatment of filepatterns that match no files"))
+ .addIfNotNull(
+ DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+ .withLabel("Interval to watch for new files"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/f1f39871/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index d0aa02c..f49443d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -57,6 +57,7 @@ import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.Nullable;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -73,8 +74,11 @@ import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.UsesTestStream;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.Watch;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -260,19 +264,84 @@ public class AvroIOTest {
.withNumShards(3));
writePipeline.run().waitUntilFinish();
- // Test read(), readAll(), and parseAllGenericRecords().
+ // Test readAll() and parseAllGenericRecords().
+ PCollection<String> paths =
+ readPipeline.apply(
+ "Create paths",
+ Create.of(
+ tmpFolder.getRoot().getAbsolutePath() + "/first*",
+ tmpFolder.getRoot().getAbsolutePath() + "/second*"));
+ PAssert.that(
+ paths.apply(
+ "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+ .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
+ PAssert.that(
+ paths.apply(
+ "Parse all",
+ AvroIO.parseAllGenericRecords(new ParseGenericClass())
+ .withCoder(AvroCoder.of(GenericClass.class))
+ .withDesiredBundleSizeBytes(10)))
+ .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
+
+ readPipeline.run();
+ }
+
+ private static class CreateGenericClass extends SimpleFunction<Long, GenericClass> {
+ @Override
+ public GenericClass apply(Long i) {
+ return new GenericClass(i.intValue(), "value" + i);
+ }
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void testAvroIOContinuouslyWriteAndReadMultipleFilepatterns() throws Throwable {
+ SimpleFunction<Long, GenericClass> mapFn = new CreateGenericClass();
+ List<GenericClass> firstValues = Lists.newArrayList();
+ List<GenericClass> secondValues = Lists.newArrayList();
+ for (int i = 0; i < 7; ++i) {
+ (i < 3 ? firstValues : secondValues).add(mapFn.apply((long) i));
+ }
+ writePipeline.apply(
+ "Sequence first",
+ GenerateSequence.from(0).to(3).withRate(1, Duration.millis(300)))
+ .apply("Map first", MapElements.via(mapFn))
+ .apply(
+ "Write first",
+ AvroIO.write(GenericClass.class)
+ .to(tmpFolder.getRoot().getAbsolutePath() + "/first")
+ .withNumShards(2));
+ writePipeline.apply(
+ "Sequence second",
+ GenerateSequence.from(3).to(7).withRate(1, Duration.millis(300)))
+ .apply("Map second", MapElements.via(mapFn))
+ .apply(
+ "Write second",
+ AvroIO.write(GenericClass.class)
+ .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
+ .withNumShards(3));
+ PipelineResult writeRes = writePipeline.run();
+
+ // Test read(), readAll(), parse(), and parseAllGenericRecords() with watchForNewFiles().
PAssert.that(
readPipeline.apply(
- "Read first",
+ "Read",
AvroIO.read(GenericClass.class)
- .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")))
+ .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
+ .watchForNewFiles(
+ Duration.millis(100),
+ Watch.Growth.<Void>afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
.containsInAnyOrder(firstValues);
PAssert.that(
readPipeline.apply(
- "Read second",
- AvroIO.read(GenericClass.class)
- .from(tmpFolder.getRoot().getAbsolutePath() + "/second*")))
- .containsInAnyOrder(secondValues);
+ "Parse",
+ AvroIO.parseGenericRecords(new ParseGenericClass())
+ .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
+ .watchForNewFiles(
+ Duration.millis(100),
+ Watch.Growth.<Void>afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
+ .containsInAnyOrder(firstValues);
+
PCollection<String> paths =
readPipeline.apply(
"Create paths",
@@ -281,17 +350,26 @@ public class AvroIOTest {
tmpFolder.getRoot().getAbsolutePath() + "/second*"));
PAssert.that(
paths.apply(
- "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+ "Read all",
+ AvroIO.readAll(GenericClass.class)
+ .watchForNewFiles(
+ Duration.millis(100),
+ Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))
+ .withDesiredBundleSizeBytes(10)))
.containsInAnyOrder(Iterables.concat(firstValues, secondValues));
PAssert.that(
paths.apply(
"Parse all",
AvroIO.parseAllGenericRecords(new ParseGenericClass())
.withCoder(AvroCoder.of(GenericClass.class))
+ .watchForNewFiles(
+ Duration.millis(100),
+ Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))
.withDesiredBundleSizeBytes(10)))
.containsInAnyOrder(Iterables.concat(firstValues, secondValues));
readPipeline.run();
+ writeRes.waitUntilFinish();
}
@Test
[2/6] beam git commit: Gets rid of raw type in
TextIO.Read.watchForNewFiles
Posted by jk...@apache.org.
Gets rid of raw type in TextIO.Read.watchForNewFiles
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/184f7a9b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/184f7a9b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/184f7a9b
Branch: refs/heads/master
Commit: 184f7a9b31641641cdb4bc7ddcf3556c0514f71b
Parents: d64f2cc
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 14:25:33 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/TextIO.java | 15 ++++---
.../org/apache/beam/sdk/transforms/Watch.java | 42 ++++++++++++++++++++
2 files changed, 51 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/184f7a9b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index cbc17ff..835008f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.transforms.Watch.Growth.ignoreInput;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
@@ -250,7 +251,7 @@ public class TextIO {
abstract Duration getWatchForNewFilesInterval();
@Nullable
- abstract TerminationCondition getWatchForNewFilesTerminationCondition();
+ abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition();
abstract boolean getHintMatchesManyFiles();
abstract EmptyMatchTreatment getEmptyMatchTreatment();
@@ -262,7 +263,8 @@ public class TextIO {
abstract Builder setFilepattern(ValueProvider<String> filepattern);
abstract Builder setCompressionType(CompressionType compressionType);
abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
- abstract Builder setWatchForNewFilesTerminationCondition(TerminationCondition condition);
+ abstract Builder setWatchForNewFilesTerminationCondition(
+ TerminationCondition<?, ?> condition);
abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
@@ -312,7 +314,8 @@ public class TextIO {
* @see TerminationCondition
*/
@Experimental(Kind.SPLITTABLE_DO_FN)
- public Read watchForNewFiles(Duration pollInterval, TerminationCondition terminationCondition) {
+ public Read watchForNewFiles(
+ Duration pollInterval, TerminationCondition<?, ?> terminationCondition) {
return toBuilder()
.setWatchForNewFilesInterval(pollInterval)
.setWatchForNewFilesTerminationCondition(terminationCondition)
@@ -352,9 +355,9 @@ public class TextIO {
.withCompressionType(getCompressionType())
.withEmptyMatchTreatment(getEmptyMatchTreatment());
if (getWatchForNewFilesInterval() != null) {
- readAll =
- readAll.watchForNewFiles(
- getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+ TerminationCondition<String, ?> readAllCondition =
+ ignoreInput(getWatchForNewFilesTerminationCondition());
+ readAll = readAll.watchForNewFiles(getWatchForNewFilesInterval(), readAllCondition);
}
return input
.apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
http://git-wip-us.apache.org/repos/asf/beam/blob/184f7a9b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index 9da2408..21f0641 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -264,6 +264,15 @@ public class Watch {
}
/**
+ * Wraps a given input-independent {@link TerminationCondition} as an equivalent condition
+ * with a given input type, passing {@code null} to the original condition as input.
+ */
+ public static <InputT, StateT> TerminationCondition<InputT, StateT> ignoreInput(
+ TerminationCondition<?, StateT> condition) {
+ return new IgnoreInput<>(condition);
+ }
+
+ /**
* Returns a {@link TerminationCondition} that holds after the given time has elapsed after the
* current input was seen.
*/
@@ -344,6 +353,39 @@ public class Watch {
}
}
+ static class IgnoreInput<InputT, StateT> implements TerminationCondition<InputT, StateT> {
+ private final TerminationCondition<?, StateT> wrapped;
+
+ IgnoreInput(TerminationCondition<?, StateT> wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public Coder<StateT> getStateCoder() {
+ return wrapped.getStateCoder();
+ }
+
+ @Override
+ public StateT forNewInput(Instant now, InputT input) {
+ return wrapped.forNewInput(now, null);
+ }
+
+ @Override
+ public StateT onSeenNewOutput(Instant now, StateT state) {
+ return wrapped.onSeenNewOutput(now, state);
+ }
+
+ @Override
+ public boolean canStopPolling(Instant now, StateT state) {
+ return wrapped.canStopPolling(now, state);
+ }
+
+ @Override
+ public String toString(StateT state) {
+ return wrapped.toString(state);
+ }
+ }
+
static class AfterTotalOf<InputT>
implements TerminationCondition<
InputT, KV<Instant /* timeStarted */, ReadableDuration /* maxTimeSinceInput */>> {
[4/6] beam git commit: Better-organized javadocs for TextIO and AvroIO
Posted by jk...@apache.org.
Better-organized javadocs for TextIO and AvroIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/84eb7f3a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/84eb7f3a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/84eb7f3a
Branch: refs/heads/master
Commit: 84eb7f3ae431b467828a76e305123601d4ee333a
Parents: 184f7a9
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 14:29:52 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 83 +++++++++++++-------
.../java/org/apache/beam/sdk/io/TextIO.java | 30 ++++---
2 files changed, 75 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/84eb7f3a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 9e0422e..d4a7cbb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -57,13 +57,20 @@ import org.apache.beam.sdk.values.TypeDescriptors;
/**
* {@link PTransform}s for reading and writing Avro files.
*
+ * <h2>Reading Avro files</h2>
+ *
* <p>To read a {@link PCollection} from one or more Avro files with the same schema known at
- * pipeline construction time, use {@code AvroIO.read()}, using {@link AvroIO.Read#from} to specify
- * the filename or filepattern to read from. Alternatively, if the filepatterns to be read are
- * themselves in a {@link PCollection}, apply {@link #readAll}.
+ * pipeline construction time, use {@link #read}, using {@link AvroIO.Read#from} to specify the
+ * filename or filepattern to read from. If the filepatterns to be read are themselves in a {@link
+ * PCollection}, apply {@link #readAll}. If the schema is unknown at pipeline construction time, use
+ * {@link #parseGenericRecords} or {@link #parseAllGenericRecords}.
+ *
+ * <p>Many configuration options below apply to several or all of these transforms.
*
* <p>See {@link FileSystems} for information on supported file systems and filepatterns.
*
+ * <h3>Reading records of a known schema</h3>
+ *
* <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
* {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a
* {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
@@ -71,26 +78,34 @@ import org.apache.beam.sdk.values.TypeDescriptors;
* schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link
* #readAllGenericRecords}.
*
- * <p>To read records from files whose schema is unknown at pipeline construction time or differs
- * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
- * parsing function for converting each {@link GenericRecord} into a value of your custom type.
- * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link
- * #parseAllGenericRecords}.
- *
* <p>For example:
*
* <pre>{@code
* Pipeline p = ...;
*
- * // A simple Read of a local file (only runs locally):
+ * // Read Avro-generated classes from files on GCS
* PCollection<AvroAutoGenClass> records =
- * p.apply(AvroIO.read(AvroAutoGenClass.class).from("/path/to/file.avro"));
+ * p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
*
- * // A Read from a GCS file (runs locally and using remote execution):
+ * // Read GenericRecord's of the given schema from files on GCS
* Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
* PCollection<GenericRecord> records =
* p.apply(AvroIO.readGenericRecords(schema)
* .from("gs://my_bucket/path/to/records-*.avro"));
+ * }</pre>
+ *
+ * <h3>Reading records of an unknown schema</h3>
+ *
+ * <p>To read records from files whose schema is unknown at pipeline construction time or differs
+ * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
+ * parsing function for converting each {@link GenericRecord} into a value of your custom type.
+ * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link
+ * #parseAllGenericRecords}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
*
* PCollection<Foo> records =
* p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
@@ -101,12 +116,7 @@ import org.apache.beam.sdk.values.TypeDescriptors;
* }));
* }</pre>
*
- * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
- * thousands or more), use {@link Read#withHintMatchesManyFiles} or {@link
- * Parse#withHintMatchesManyFiles} for better performance and scalability. Note that it may decrease
- * performance if the filepattern matches only a small number of files.
- *
- * <p>Reading from a {@link PCollection} of filepatterns:
+ * <h3>Reading from a {@link PCollection} of filepatterns</h3>
*
* <pre>{@code
* Pipeline p = ...;
@@ -120,6 +130,15 @@ import org.apache.beam.sdk.values.TypeDescriptors;
* filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...);
* }</pre>
*
+ * <h3>Reading a very large number of files</h3>
+ *
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small number
+ * of files.
+ *
+ * <h2>Writing Avro files</h2>
+ *
* <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
* {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
* DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set
@@ -128,13 +147,11 @@ import org.apache.beam.sdk.values.TypeDescriptors;
* default write filename policy using {@link Write#to(FileBasedSink.FilenamePolicy)} to specify a
* custom file naming policy.
*
- * <p>By default, all input is put into the global window before writing. If per-window writes are
- * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()}
- * will cause windowing and triggering to be preserved. When producing windowed writes with a
- * streaming runner that supports triggers, the number of output shards must be set explicitly using
- * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen
- * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set,
- * and unique windows and triggers must produce unique filenames.
+ * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link
+ * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or
+ * overridden using {@link AvroIO.Write#withCodec}.
+ *
+ * <h3>Writing specific or generic records</h3>
*
* <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write
* {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes
@@ -157,6 +174,18 @@ import org.apache.beam.sdk.values.TypeDescriptors;
* .withSuffix(".avro"));
* }</pre>
*
+ * <h3>Writing windowed or unbounded data</h3>
+ *
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()}
+ * will cause windowing and triggering to be preserved. When producing windowed writes with a
+ * streaming runner that supports triggers, the number of output shards must be set explicitly using
+ * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen
+ * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set,
+ * and unique windows and triggers must produce unique filenames.
+ *
+ * <h3>Writing data to multiple destinations</h3>
+ *
* <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file
* destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user
* events (e.g. actions on a website) is written out to Avro files. Each event contains the user id
@@ -201,10 +230,6 @@ import org.apache.beam.sdk.values.TypeDescriptors;
* events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
* .to(new UserDynamicAvros()));
* }</pre>
- *
- * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link
- * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or
- * overridden using {@link AvroIO.Write#withCodec}.
*/
public class AvroIO {
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/84eb7f3a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 835008f..442e4d9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -56,6 +56,8 @@ import org.joda.time.Duration;
/**
* {@link PTransform}s for reading and writing text files.
*
+ * <h2>Reading text files</h2>
+ *
* <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to
* instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the
* file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link
@@ -64,6 +66,8 @@ import org.joda.time.Duration;
* <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to
* one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n').
*
+ * <h3>Filepattern expansion and watching</h3>
+ *
* <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} and {@link
* ReadAll#watchForNewFiles} allow streaming of new files matching the filepattern(s).
*
@@ -81,11 +85,6 @@ import org.joda.time.Duration;
* PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
* }</pre>
*
- * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
- * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
- * scalability. Note that it may decrease performance if the filepattern matches only a small number
- * of files.
- *
* <p>Example 2: reading a PCollection of filenames.
*
* <pre>{@code
@@ -113,6 +112,15 @@ import org.joda.time.Duration;
* afterTimeSinceNewOutput(Duration.standardHours(1))));
* }</pre>
*
+ * <h3>Reading a very large number of files</h3>
+ *
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small number
+ * of files.
+ *
+ * <h2>Writing text files</h2>
+ *
* <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
* {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
*
@@ -130,6 +138,13 @@ import org.joda.time.Duration;
* .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
* }</pre>
*
+ * <p>Any existing files with the same names as generated output files will be overwritten.
+ *
+ * <p>If you want better control over how filenames are generated than the default policy allows, a
+ * custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}.
+ *
+ * <h3>Writing windowed or unbounded data</h3>
+ *
* <p>By default, all input is put into the global window before writing. If per-window writes are
* desired - for example, when using a streaming runner - {@link TextIO.Write#withWindowedWrites()}
* will cause windowing and triggering to be preserved. When producing windowed writes with a
@@ -140,8 +155,7 @@ import org.joda.time.Duration;
* for the window and the pane; W is expanded into the window text, and P into the pane; the default
* template will include both the window and the pane in the filename.
*
- * <p>If you want better control over how filenames are generated than the default policy allows, a
- * custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}.
+ * <h3>Writing data to multiple destinations</h3>
*
* <p>TextIO also supports dynamic, value-dependent file destinations. The most general form of this
* is done via {@link TextIO.Write#to(DynamicDestinations)}. A {@link DynamicDestinations} class
@@ -166,8 +180,6 @@ import org.joda.time.Duration;
* }),
* new Params().withBaseFilename(baseDirectory + "/empty");
* }</pre>
- *
- * <p>Any existing files with the same names as generated output files will be overwritten.
*/
public class TextIO {
/**
[3/6] beam git commit: Adds EmptyMatchTreatment to AvroIO
Posted by jk...@apache.org.
Adds EmptyMatchTreatment to AvroIO
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82b08523
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82b08523
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82b08523
Branch: refs/heads/master
Commit: 82b08523084aa6f20ea3c4d5b8b89cdbe0378060
Parents: 84eb7f3
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 14:40:52 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/AvroIO.java | 161 ++++++++++++++-----
.../java/org/apache/beam/sdk/io/AvroSource.java | 22 ++-
.../apache/beam/sdk/io/BlockBasedSource.java | 27 +++-
.../org/apache/beam/sdk/io/FileBasedSource.java | 4 +
4 files changed, 171 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d4a7cbb..9601a7d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -69,6 +70,12 @@ import org.apache.beam.sdk.values.TypeDescriptors;
*
* <p>See {@link FileSystems} for information on supported file systems and filepatterns.
*
+ * <h3>Filepattern expansion and watching</h3>
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link #readAll}
+ * allows them in case the filepattern contains a glob wildcard character. Use {@link
+ * Read#withEmptyMatchTreatment} to configure this behavior.
+ *
* <h3>Reading records of a known schema</h3>
*
* <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
@@ -239,6 +246,7 @@ public class AvroIO {
*/
public static <T> Read<T> read(Class<T> recordClass) {
return new AutoValue_AvroIO_Read.Builder<T>()
+ .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
.setRecordClass(recordClass)
.setSchema(ReflectData.get().getSchema(recordClass))
.setHintMatchesManyFiles(false)
@@ -248,6 +256,7 @@ public class AvroIO {
/** Like {@link #read}, but reads each filepattern in the input {@link PCollection}. */
public static <T> ReadAll<T> readAll(Class<T> recordClass) {
return new AutoValue_AvroIO_ReadAll.Builder<T>()
+ .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
.setRecordClass(recordClass)
.setSchema(ReflectData.get().getSchema(recordClass))
// 64MB is a reasonable value that allows to amortize the cost of opening files,
@@ -260,6 +269,7 @@ public class AvroIO {
/** Reads Avro file(s) containing records of the specified schema. */
public static Read<GenericRecord> readGenericRecords(Schema schema) {
return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
+ .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
.setRecordClass(GenericRecord.class)
.setSchema(schema)
.setHintMatchesManyFiles(false)
@@ -272,6 +282,7 @@ public class AvroIO {
*/
public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
+ .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
.setRecordClass(GenericRecord.class)
.setSchema(schema)
.setDesiredBundleSizeBytes(64 * 1024 * 1024L)
@@ -300,6 +311,7 @@ public class AvroIO {
*/
public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
return new AutoValue_AvroIO_Parse.Builder<T>()
+ .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
.setParseFn(parseFn)
.setHintMatchesManyFiles(false)
.build();
@@ -312,6 +324,7 @@ public class AvroIO {
public static <T> ParseAll<T> parseAllGenericRecords(
SerializableFunction<GenericRecord, T> parseFn) {
return new AutoValue_AvroIO_ParseAll.Builder<T>()
+ .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
.setParseFn(parseFn)
.setDesiredBundleSizeBytes(64 * 1024 * 1024L)
.build();
@@ -392,6 +405,7 @@ public class AvroIO {
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
@Nullable abstract ValueProvider<String> getFilepattern();
+ abstract EmptyMatchTreatment getEmptyMatchTreatment();
@Nullable abstract Class<T> getRecordClass();
@Nullable abstract Schema getSchema();
abstract boolean getHintMatchesManyFiles();
@@ -401,6 +415,7 @@ public class AvroIO {
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+ abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
abstract Builder<T> setRecordClass(Class<T> recordClass);
abstract Builder<T> setSchema(Schema schema);
abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
@@ -424,6 +439,13 @@ public class AvroIO {
}
/**
+ * Configures whether or not a filepattern matching no files is allowed.
+ */
+ public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return toBuilder().setEmptyMatchTreatment(treatment).build();
+ }
+
+ /**
* Hints that the filepattern specified in {@link #from(String)} matches a very large number of
* files.
*
@@ -440,37 +462,48 @@ public class AvroIO {
public PCollection<T> expand(PBegin input) {
checkNotNull(getFilepattern(), "filepattern");
checkNotNull(getSchema(), "schema");
- if (getHintMatchesManyFiles()) {
- ReadAll<T> readAll =
- (getRecordClass() == GenericRecord.class)
- ? (ReadAll<T>) readAllGenericRecords(getSchema())
- : readAll(getRecordClass());
- return input
- .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
- .apply(readAll);
- } else {
- return input
- .getPipeline()
- .apply(
- "Read",
- org.apache.beam.sdk.io.Read.from(
- createSource(getFilepattern(), getRecordClass(), getSchema())));
+
+ if (!getHintMatchesManyFiles()) {
+ return input.apply(
+ "Read",
+ org.apache.beam.sdk.io.Read.from(
+ createSource(
+ getFilepattern(), getEmptyMatchTreatment(), getRecordClass(), getSchema())));
}
+ // All other cases go through ReadAll.
+
+ ReadAll<T> readAll =
+ (getRecordClass() == GenericRecord.class)
+ ? (ReadAll<T>) readAllGenericRecords(getSchema())
+ : readAll(getRecordClass());
+ readAll = readAll.withEmptyMatchTreatment(getEmptyMatchTreatment());
+ return input
+ .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+ .apply("Via ReadAll", readAll);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.addIfNotNull(
- DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"));
+ builder
+ .addIfNotNull(
+ DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+ .add(
+ DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+ .withLabel("Treatment of filepatterns that match no files"));
}
@SuppressWarnings("unchecked")
private static <T> AvroSource<T> createSource(
- ValueProvider<String> filepattern, Class<T> recordClass, Schema schema) {
+ ValueProvider<String> filepattern,
+ EmptyMatchTreatment emptyMatchTreatment,
+ Class<T> recordClass,
+ Schema schema) {
+ AvroSource<?> source =
+ AvroSource.from(filepattern).withEmptyMatchTreatment(emptyMatchTreatment);
return recordClass == GenericRecord.class
- ? (AvroSource<T>) AvroSource.from(filepattern).withSchema(schema)
- : AvroSource.from(filepattern).withSchema(recordClass);
+ ? (AvroSource<T>) source.withSchema(schema)
+ : source.withSchema(recordClass);
}
}
@@ -479,6 +512,7 @@ public class AvroIO {
/** Implementation of {@link #readAll}. */
@AutoValue
public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+ abstract EmptyMatchTreatment getEmptyMatchTreatment();
@Nullable abstract Class<T> getRecordClass();
@Nullable abstract Schema getSchema();
abstract long getDesiredBundleSizeBytes();
@@ -487,6 +521,7 @@ public class AvroIO {
@AutoValue.Builder
abstract static class Builder<T> {
+ abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
abstract Builder<T> setRecordClass(Class<T> recordClass);
abstract Builder<T> setSchema(Schema schema);
abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -494,6 +529,11 @@ public class AvroIO {
abstract ReadAll<T> build();
}
+ /** Like {@link Read#withEmptyMatchTreatment}. */
+ public ReadAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return toBuilder().setEmptyMatchTreatment(treatment).build();
+ }
+
@VisibleForTesting
ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
@@ -502,24 +542,40 @@ public class AvroIO {
@Override
public PCollection<T> expand(PCollection<String> input) {
checkNotNull(getSchema(), "schema");
+ Match.Filepatterns matchFilepatterns =
+ Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment());
+
return input
- .apply(Match.filepatterns())
+ .apply(matchFilepatterns)
.apply(
"Read all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */,
getDesiredBundleSizeBytes(),
- new CreateSourceFn<>(getRecordClass(), getSchema().toString())))
+ new CreateSourceFn<>(
+ getEmptyMatchTreatment(), getRecordClass(), getSchema().toString())))
.setCoder(AvroCoder.of(getRecordClass(), getSchema()));
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .add(
+ DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+ .withLabel("Treatment of filepatterns that match no files"));
+ }
}
private static class CreateSourceFn<T>
implements SerializableFunction<String, FileBasedSource<T>> {
+ private final EmptyMatchTreatment emptyMatchTreatment;
private final Class<T> recordClass;
private final Supplier<Schema> schemaSupplier;
- public CreateSourceFn(Class<T> recordClass, String jsonSchema) {
+ public CreateSourceFn(
+ EmptyMatchTreatment emptyMatchTreatment, Class<T> recordClass, String jsonSchema) {
+ this.emptyMatchTreatment = emptyMatchTreatment;
this.recordClass = recordClass;
this.schemaSupplier = AvroUtils.serializableSchemaSupplier(jsonSchema);
}
@@ -527,7 +583,7 @@ public class AvroIO {
@Override
public FileBasedSource<T> apply(String input) {
return Read.createSource(
- StaticValueProvider.of(input), recordClass, schemaSupplier.get());
+ StaticValueProvider.of(input), emptyMatchTreatment, recordClass, schemaSupplier.get());
}
}
@@ -537,6 +593,7 @@ public class AvroIO {
@AutoValue
public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
@Nullable abstract ValueProvider<String> getFilepattern();
+ abstract EmptyMatchTreatment getEmptyMatchTreatment();
abstract SerializableFunction<GenericRecord, T> getParseFn();
@Nullable abstract Coder<T> getCoder();
abstract boolean getHintMatchesManyFiles();
@@ -546,6 +603,7 @@ public class AvroIO {
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+ abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
abstract Builder<T> setCoder(Coder<T> coder);
abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles);
@@ -563,6 +621,11 @@ public class AvroIO {
return toBuilder().setFilepattern(filepattern).build();
}
+ /** Like {@link Read#withEmptyMatchTreatment}. */
+ public Parse<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return toBuilder().setEmptyMatchTreatment(treatment).build();
+ }
+
/** Sets a coder for the result of the parse function. */
public Parse<T> withCoder(Coder<T> coder) {
return toBuilder().setCoder(coder).build();
@@ -577,14 +640,20 @@ public class AvroIO {
public PCollection<T> expand(PBegin input) {
checkNotNull(getFilepattern(), "filepattern");
Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
- if (getHintMatchesManyFiles()) {
- return input
- .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
- .apply(parseAllGenericRecords(getParseFn()).withCoder(getCoder()));
+
+ if (!getHintMatchesManyFiles()) {
+ return input.apply(
+ org.apache.beam.sdk.io.Read.from(
+ AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
}
- return input.apply(
- org.apache.beam.sdk.io.Read.from(
- AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
+ // All other cases go through ParseAllGenericRecords.
+ ParseAll<T> parseAll =
+ parseAllGenericRecords(getParseFn())
+ .withCoder(coder)
+ .withEmptyMatchTreatment(getEmptyMatchTreatment());
+ return input
+ .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+ .apply("Via ParseAll", parseAll);
}
private static <T> Coder<T> inferCoder(
@@ -612,7 +681,10 @@ public class AvroIO {
builder
.addIfNotNull(
DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
- .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+ .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
+ .add(
+ DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+ .withLabel("Treatment of filepatterns that match no files"));
}
}
@@ -621,6 +693,7 @@ public class AvroIO {
/** Implementation of {@link #parseAllGenericRecords}. */
@AutoValue
public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+ abstract EmptyMatchTreatment getEmptyMatchTreatment();
abstract SerializableFunction<GenericRecord, T> getParseFn();
@Nullable abstract Coder<T> getCoder();
abstract long getDesiredBundleSizeBytes();
@@ -629,6 +702,7 @@ public class AvroIO {
@AutoValue.Builder
abstract static class Builder<T> {
+ abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
abstract Builder<T> setCoder(Coder<T> coder);
abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -636,6 +710,11 @@ public class AvroIO {
abstract ParseAll<T> build();
}
+ /** Like {@link Read#withEmptyMatchTreatment}. */
+ public ParseAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return toBuilder().setEmptyMatchTreatment(treatment).build();
+ }
+
/** Specifies the coder for the result of the {@code parseFn}. */
public ParseAll<T> withCoder(Coder<T> coder) {
return toBuilder().setCoder(coder).build();
@@ -650,15 +729,21 @@ public class AvroIO {
public PCollection<T> expand(PCollection<String> input) {
final Coder<T> coder =
Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
- SerializableFunction<String, FileBasedSource<T>> createSource =
+ final SerializableFunction<GenericRecord, T> parseFn = getParseFn();
+ final EmptyMatchTreatment emptyMatchTreatment = getEmptyMatchTreatment();
+ final SerializableFunction<String, FileBasedSource<T>> createSource =
new SerializableFunction<String, FileBasedSource<T>>() {
@Override
public FileBasedSource<T> apply(String input) {
- return AvroSource.from(input).withParseFn(getParseFn(), coder);
+ return AvroSource.from(input)
+ .withParseFn(parseFn, coder)
+ .withEmptyMatchTreatment(emptyMatchTreatment);
}
};
+ Match.Filepatterns matchFilepatterns =
+ Match.filepatterns().withEmptyMatchTreatment(emptyMatchTreatment);
return input
- .apply(Match.filepatterns())
+ .apply(matchFilepatterns)
.apply(
"Parse all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
@@ -671,7 +756,11 @@ public class AvroIO {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
- builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+ builder
+ .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
+ .add(
+ DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+ .withLabel("Treatment of filepatterns that match no files"));
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 8dd3125..2600d76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -211,6 +212,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
return new AvroSource<>(
fileNameOrPattern,
+ EmptyMatchTreatment.DISALLOW,
DEFAULT_MIN_BUNDLE_SIZE,
readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
}
@@ -220,11 +222,20 @@ public class AvroSource<T> extends BlockBasedSource<T> {
return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
}
+ public AvroSource<T> withEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment) {
+ return new AvroSource<T>(
+ getFileOrPatternSpecProvider(),
+ emptyMatchTreatment,
+ getMinBundleSize(),
+ mode);
+ }
+
/** Reads files containing records that conform to the given schema. */
public AvroSource<GenericRecord> withSchema(String schema) {
checkNotNull(schema, "schema");
return new AvroSource<>(
getFileOrPatternSpecProvider(),
+ getEmptyMatchTreatment(),
getMinBundleSize(),
readGenericRecordsWithSchema(schema));
}
@@ -240,6 +251,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
checkNotNull(clazz, "clazz");
return new AvroSource<>(
getFileOrPatternSpecProvider(),
+ getEmptyMatchTreatment(),
getMinBundleSize(),
readGeneratedClasses(clazz));
}
@@ -254,6 +266,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
checkNotNull(parseFn, "coder");
return new AvroSource<>(
getFileOrPatternSpecProvider(),
+ getEmptyMatchTreatment(),
getMinBundleSize(),
parseGenericRecords(parseFn, coder));
}
@@ -263,15 +276,17 @@ public class AvroSource<T> extends BlockBasedSource<T> {
* minBundleSize} and its use.
*/
public AvroSource<T> withMinBundleSize(long minBundleSize) {
- return new AvroSource<>(getFileOrPatternSpecProvider(), minBundleSize, mode);
+ return new AvroSource<>(
+ getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), minBundleSize, mode);
}
/** Constructor for FILEPATTERN mode. */
private AvroSource(
ValueProvider<String> fileNameOrPattern,
+ EmptyMatchTreatment emptyMatchTreatment,
long minBundleSize,
Mode<T> mode) {
- super(fileNameOrPattern, minBundleSize);
+ super(fileNameOrPattern, emptyMatchTreatment, minBundleSize);
this.mode = mode;
}
@@ -463,7 +478,8 @@ public class AvroSource<T> extends BlockBasedSource<T> {
return new AvroSource<>(
getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), mode);
case FILEPATTERN:
- return new AvroSource<>(getFileOrPatternSpecProvider(), getMinBundleSize(), mode);
+ return new AvroSource<>(
+ getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), getMinBundleSize(), mode);
default:
throw new InvalidObjectException(
String.format("Unknown mode %s for AvroSource %s", getMode(), this));
http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
index 25e8483..ec4f4ad 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
@@ -63,18 +64,36 @@ import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
/**
* Creates a {@code BlockBasedSource} based on a file name or pattern. Subclasses must call this
- * constructor when creating a {@code BlockBasedSource} for a file pattern. See
- * {@link FileBasedSource} for more information.
+ * constructor when creating a {@code BlockBasedSource} for a file pattern. See {@link
+ * FileBasedSource} for more information.
+ */
+ public BlockBasedSource(
+ String fileOrPatternSpec, EmptyMatchTreatment emptyMatchTreatment, long minBundleSize) {
+ this(StaticValueProvider.of(fileOrPatternSpec), emptyMatchTreatment, minBundleSize);
+ }
+
+ /**
+ * Like {@link #BlockBasedSource(String, EmptyMatchTreatment, long)} but with a default {@link
+ * EmptyMatchTreatment} of {@link EmptyMatchTreatment#DISALLOW}.
*/
public BlockBasedSource(String fileOrPatternSpec, long minBundleSize) {
- super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
+ this(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
}
/** Like {@link #BlockBasedSource(String, long)}. */
public BlockBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
- super(fileOrPatternSpec, minBundleSize);
+ this(fileOrPatternSpec, EmptyMatchTreatment.DISALLOW, minBundleSize);
}
+ /** Like {@link #BlockBasedSource(String, EmptyMatchTreatment, long)}. */
+ public BlockBasedSource(
+ ValueProvider<String> fileOrPatternSpec,
+ EmptyMatchTreatment emptyMatchTreatment,
+ long minBundleSize) {
+ super(fileOrPatternSpec, emptyMatchTreatment, minBundleSize);
+ }
+
+
/**
* Creates a {@code BlockBasedSource} for a single file. Subclasses must call this constructor
* when implementing {@link BlockBasedSource#createForSubrangeOfFile}. See documentation in
http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index f835fa4..dabda84 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -154,6 +154,10 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
return fileOrPatternSpec;
}
+ public final EmptyMatchTreatment getEmptyMatchTreatment() {
+ return emptyMatchTreatment;
+ }
+
public final Mode getMode() {
return mode;
}
[5/6] beam git commit: Fixes a findbugs error in Apex runner
Posted by jk...@apache.org.
Fixes a findbugs error in Apex runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6590aed4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6590aed4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6590aed4
Branch: refs/heads/master
Commit: 6590aed4091a1fbff75311afc45c3d3df1b80d38
Parents: f1f3987
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 17:50:25 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:19 2017 -0700
----------------------------------------------------------------------
.../beam/runners/apex/translation/utils/ApexStateInternals.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/6590aed4/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index 18ea8e4..e23601d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -431,7 +431,7 @@ public class ApexStateInternals<K> implements StateInternals {
/**
* Serializable state for internals (namespace to state tag to coded value).
*/
- private Map<Slice, Table<String, String, byte[]>> perKeyState = new HashMap<>();
+ private Map<Slice, HashBasedTable<String, String, byte[]>> perKeyState = new HashMap<>();
private final Coder<K> keyCoder;
private ApexStateInternalsFactory(Coder<K> keyCoder) {
@@ -451,7 +451,7 @@ public class ApexStateInternals<K> implements StateInternals {
} catch (CoderException e) {
throw new RuntimeException(e);
}
- Table<String, String, byte[]> stateTable = perKeyState.get(keyBytes);
+ HashBasedTable<String, String, byte[]> stateTable = perKeyState.get(keyBytes);
if (stateTable == null) {
stateTable = HashBasedTable.create();
perKeyState.put(keyBytes, stateTable);
[6/6] beam git commit: This closes #3725: [BEAM-2827] Introduces
AvroIO.watchForNewFiles
Posted by jk...@apache.org.
This closes #3725: [BEAM-2827] Introduces AvroIO.watchForNewFiles
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c2cab01
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c2cab01
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c2cab01
Branch: refs/heads/master
Commit: 5c2cab0179725ec433392b30efcfa2833b825a18
Parents: d64f2cc 6590aed
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 30 11:55:45 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:45 2017 -0700
----------------------------------------------------------------------
.../translation/utils/ApexStateInternals.java | 4 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 364 +++++++++++++++----
.../java/org/apache/beam/sdk/io/AvroSource.java | 22 +-
.../apache/beam/sdk/io/BlockBasedSource.java | 27 +-
.../org/apache/beam/sdk/io/FileBasedSource.java | 4 +
.../java/org/apache/beam/sdk/io/TextIO.java | 45 ++-
.../org/apache/beam/sdk/transforms/Watch.java | 42 +++
.../java/org/apache/beam/sdk/io/AvroIOTest.java | 94 ++++-
8 files changed, 505 insertions(+), 97 deletions(-)
----------------------------------------------------------------------