You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 23:44:15 UTC
[1/4] beam git commit: [BEAM-2512] Introduces
TextIO.watchForNewFiles() and Match
Repository: beam
Updated Branches:
refs/heads/master 8d337ff0e -> df36bd9d7
[BEAM-2512] Introduces TextIO.watchForNewFiles() and Match
Part of http://s.apache.org/textio-sdf, based on
http://s.apache.org/beam-watch-transform.
The Match transform can be useful for users who want to write their own
file-based connectors, or for advanced use cases such as: watch for new
subdirectories to appear in a directory (using Match), and then start
watching each subdirectory for new files and reading them
(using TextIO.watchForNewFiles()).
Additionally, finally makes it configurable whether TextIO.read/readAll()
allow filepatterns matching no files.
Normal reads disallow empty filepatterns (to preserve old behavior), readAll()
allows them if the filepattern contains a wildcard (which seems a reasonable
default behavior that read() should have had from the beginning, but we can't
change it), and watchForNewFiles() allows them unconditionally (because files
might appear later).
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fe002c22
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fe002c22
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fe002c22
Branch: refs/heads/master
Commit: fe002c221602a543b99afd6db910a7a60b259fa4
Parents: db9aede
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 14:44:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 16:38:23 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/annotations/Experimental.java | 5 +-
.../java/org/apache/beam/sdk/io/AvroIO.java | 2 +
.../main/java/org/apache/beam/sdk/io/Match.java | 156 +++++++++++++++++++
.../beam/sdk/io/ReadAllViaFileBasedSource.java | 46 +++---
.../java/org/apache/beam/sdk/io/TextIO.java | 156 ++++++++++++++++---
.../org/apache/beam/sdk/transforms/DoFn.java | 11 +-
.../org/apache/beam/sdk/transforms/Watch.java | 16 +-
.../org/apache/beam/sdk/io/TextIOReadTest.java | 54 ++++++-
8 files changed, 384 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index 8224ebb..80c4613 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -72,8 +72,9 @@ public @interface Experimental {
OUTPUT_TIME,
/**
- * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>.
- * Do not use: API is unstable and runner support is incomplete.
+ * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>. See <a
+ * href="https://beam.apache.org/documentation/runners/capability-matrix/">capability matrix</a>
+ * for runner support.
*/
SPLITTABLE_DO_FN,
http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index cd5857c..653b806 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -478,6 +478,7 @@ public class AvroIO {
public PCollection<T> expand(PCollection<String> input) {
checkNotNull(getSchema(), "schema");
return input
+ .apply(Match.filepatterns())
.apply(
"Read all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
@@ -632,6 +633,7 @@ public class AvroIO {
}
};
return input
+ .apply(Match.filepatterns())
.apply(
"Parse all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java
new file mode 100644
index 0000000..bb44fac
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Matches each filepattern in a collection of filepatterns using {@link FileSystems#match}, and
+ * produces a collection of matched resources (both files and directories) as {@link Metadata}.
+ * Resources are not deduplicated between filepatterns, i.e. if the same resource matches multiple
+ * filepatterns, it will be produced multiple times.
+ *
+ * <p>By default, this transform matches each filepattern once and produces a bounded {@link
+ * PCollection}. To continuously watch each filepattern for new matches, use {@link
+ * Filepatterns#continuously(Duration, TerminationCondition)} - this will produce an unbounded
+ * {@link PCollection}.
+ *
+ * <p>By default, filepatterns matching no resources are treated according to {@link
+ * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link
+ * Filepatterns#withEmptyMatchTreatment}.
+ */
+public class Match {
+ private static final Logger LOG = LoggerFactory.getLogger(Match.class);
+
+ /** See {@link Match}. */
+ public static Filepatterns filepatterns() {
+ return new AutoValue_Match_Filepatterns.Builder()
+ .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
+ .build();
+ }
+
+ /** Implementation of {@link #filepatterns}. */
+ @AutoValue
+ public abstract static class Filepatterns
+ extends PTransform<PCollection<String>, PCollection<Metadata>> {
+ abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+ @Nullable
+ abstract Duration getWatchInterval();
+
+ @Nullable
+ abstract TerminationCondition<String, ?> getWatchTerminationCondition();
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
+ abstract Builder setWatchInterval(Duration watchInterval);
+
+ abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
+ abstract Filepatterns build();
+ }
+
+ /**
+ * Sets whether or not filepatterns matching no files are allowed. When using {@link
+ * #continuously}, they are always allowed, and this parameter is ignored.
+ */
+ public Filepatterns withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return toBuilder().setEmptyMatchTreatment(treatment).build();
+ }
+
+ /**
+ * Continuously watches for new resources matching the filepattern, repeatedly matching it at
+ * the given interval, until the given termination condition is reached. The returned {@link
+ * PCollection} is unbounded.
+ *
+ * <p>This works only in runners supporting {@link Experimental.Kind#SPLITTABLE_DO_FN}.
+ *
+ * @see TerminationCondition
+ */
+ @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+ public Filepatterns continuously(
+ Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+ return toBuilder()
+ .setWatchInterval(pollInterval)
+ .setWatchTerminationCondition(terminationCondition)
+ .build();
+ }
+
+ @Override
+ public PCollection<Metadata> expand(PCollection<String> input) {
+ if (getWatchInterval() == null) {
+ return input.apply("Match filepatterns", ParDo.of(new MatchFn(getEmptyMatchTreatment())));
+ } else {
+ return input
+ .apply(
+ "Continuously match filepatterns",
+ Watch.growthOf(new MatchPollFn())
+ .withPollInterval(getWatchInterval())
+ .withTerminationPerInput(getWatchTerminationCondition()))
+ .apply(Values.<Metadata>create());
+ }
+ }
+
+ private static class MatchFn extends DoFn<String, Metadata> {
+ private final EmptyMatchTreatment emptyMatchTreatment;
+
+ public MatchFn(EmptyMatchTreatment emptyMatchTreatment) {
+ this.emptyMatchTreatment = emptyMatchTreatment;
+ }
+
+ @ProcessElement
+ public void process(ProcessContext c) throws Exception {
+ String filepattern = c.element();
+ MatchResult match = FileSystems.match(filepattern, emptyMatchTreatment);
+ LOG.info("Matched {} files for pattern {}", match.metadata().size(), filepattern);
+ for (Metadata metadata : match.metadata()) {
+ c.output(metadata);
+ }
+ }
+ }
+
+ private static class MatchPollFn implements Watch.Growth.PollFn<String, Metadata> {
+ @Override
+ public PollResult<Metadata> apply(String input, Instant timestamp) throws Exception {
+ return PollResult.incomplete(
+ Instant.now(), FileSystems.match(input, EmptyMatchTreatment.ALLOW).metadata());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
index 66aa41e..990f508 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
@@ -21,7 +21,8 @@ import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -33,10 +34,14 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
/**
- * Reads each filepattern in the input {@link PCollection} using given parameters for splitting
- * files into offset ranges and for creating a {@link FileBasedSource} for a file.
+ * Reads each file in the input {@link PCollection} of {@link Metadata} using given parameters for
+ * splitting files into offset ranges and for creating a {@link FileBasedSource} for a file. The
+ * input {@link PCollection} must not contain {@link ResourceId#isDirectory directories}.
+ *
+ * <p>To obtain the collection of {@link Metadata} from a filepattern, use {@link
+ * Match#filepatterns()}.
*/
-class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PCollection<T>> {
+class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<Metadata>, PCollection<T>> {
private final SerializableFunction<String, Boolean> isSplittable;
private final long desiredBundleSizeBytes;
private final SerializableFunction<String, FileBasedSource<T>> createSource;
@@ -51,13 +56,12 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl
}
@Override
- public PCollection<T> expand(PCollection<String> input) {
+ public PCollection<T> expand(PCollection<Metadata> input) {
return input
- .apply("Expand glob", ParDo.of(new ExpandGlobFn()))
.apply(
"Split into ranges",
ParDo.of(new SplitIntoRangesFn(isSplittable, desiredBundleSizeBytes)))
- .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<MatchResult.Metadata, OffsetRange>>())
+ .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, OffsetRange>>())
.apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource)));
}
@@ -86,23 +90,7 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl
}
}
- private static class ExpandGlobFn extends DoFn<String, MatchResult.Metadata> {
- @ProcessElement
- public void process(ProcessContext c) throws Exception {
- MatchResult match = FileSystems.match(c.element());
- checkArgument(
- match.status().equals(MatchResult.Status.OK),
- "Failed to match filepattern %s: %s",
- c.element(),
- match.status());
- for (MatchResult.Metadata metadata : match.metadata()) {
- c.output(metadata);
- }
- }
- }
-
- private static class SplitIntoRangesFn
- extends DoFn<MatchResult.Metadata, KV<MatchResult.Metadata, OffsetRange>> {
+ private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, OffsetRange>> {
private final SerializableFunction<String, Boolean> isSplittable;
private final long desiredBundleSizeBytes;
@@ -114,7 +102,11 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl
@ProcessElement
public void process(ProcessContext c) {
- MatchResult.Metadata metadata = c.element();
+ Metadata metadata = c.element();
+ checkArgument(
+ !metadata.resourceId().isDirectory(),
+ "Resource %s is a directory",
+ metadata.resourceId());
if (!metadata.isReadSeekEfficient()
|| !isSplittable.apply(metadata.resourceId().toString())) {
c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes())));
@@ -127,7 +119,7 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl
}
}
- private static class ReadFileRangesFn<T> extends DoFn<KV<MatchResult.Metadata, OffsetRange>, T> {
+ private static class ReadFileRangesFn<T> extends DoFn<KV<Metadata, OffsetRange>, T> {
private final SerializableFunction<String, FileBasedSource<T>> createSource;
private ReadFileRangesFn(SerializableFunction<String, FileBasedSource<T>> createSource) {
@@ -136,7 +128,7 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl
@ProcessElement
public void process(ProcessContext c) throws IOException {
- MatchResult.Metadata metadata = c.element().getKey();
+ Metadata metadata = c.element().getKey();
OffsetRange range = c.element().getValue();
FileBasedSource<T> source = createSource.apply(metadata.toString());
try (BoundedSource.BoundedReader<T> reader =
http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 9a14ad9..612f5c5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -44,10 +45,12 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
+import org.joda.time.Duration;
/**
* {@link PTransform}s for reading and writing text files.
@@ -57,9 +60,16 @@ import org.apache.beam.sdk.values.PDone;
* file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link
* PCollection}, apply {@link TextIO#readAll()}.
*
- * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each
- * corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r',
- * or '\r\n').
+ * <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to
+ * one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n').
+ *
+ * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} and {@link
+ * ReadAll#watchForNewFiles} allow streaming of new files matching the filepattern(s).
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link #readAll}
+ * allows them in case the filepattern contains a glob wildcard character. Use {@link
+ * TextIO.Read#withEmptyMatchTreatment} and {@link TextIO.ReadAll#withEmptyMatchTreatment} to
+ * configure this behavior.
*
* <p>Example 1: reading a file or filepattern.
*
@@ -88,6 +98,20 @@ import org.apache.beam.sdk.values.PDone;
* PCollection<String> lines = filenames.apply(TextIO.readAll());
* }</pre>
*
+ * <p>Example 3: streaming new files matching a filepattern.
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> lines = p.apply(TextIO.read()
+ * .from("/local/path/to/files/*")
+ * .watchForNewFiles(
+ * // Check for new files every minute
+ * Duration.standardMinutes(1),
+ * // Stop watching the filepattern if no new files appear within an hour
+ * afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
* <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
* {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
*
@@ -153,6 +177,7 @@ public class TextIO {
return new AutoValue_TextIO_Read.Builder()
.setCompressionType(CompressionType.AUTO)
.setHintMatchesManyFiles(false)
+ .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
.build();
}
@@ -173,6 +198,7 @@ public class TextIO {
// but is not so large as to exhaust a typical runner's maximum amount of output per
// ProcessElement call.
.setDesiredBundleSizeBytes(64 * 1024 * 1024L)
+ .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
.build();
}
@@ -219,7 +245,15 @@ public class TextIO {
public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
@Nullable abstract ValueProvider<String> getFilepattern();
abstract CompressionType getCompressionType();
+
+ @Nullable
+ abstract Duration getWatchForNewFilesInterval();
+
+ @Nullable
+ abstract TerminationCondition getWatchForNewFilesTerminationCondition();
+
abstract boolean getHintMatchesManyFiles();
+ abstract EmptyMatchTreatment getEmptyMatchTreatment();
abstract Builder toBuilder();
@@ -227,7 +261,10 @@ public class TextIO {
abstract static class Builder {
abstract Builder setFilepattern(ValueProvider<String> filepattern);
abstract Builder setCompressionType(CompressionType compressionType);
+ abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+ abstract Builder setWatchForNewFilesTerminationCondition(TerminationCondition condition);
abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
+ abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
abstract Read build();
}
@@ -257,8 +294,7 @@ public class TextIO {
}
/**
- * Returns a new transform for reading from text files that's like this one but
- * reads from input sources using the specified compression type.
+ * Reads from input sources using the specified compression type.
*
* <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
*/
@@ -267,6 +303,23 @@ public class TextIO {
}
/**
+ * Continuously watches for new files matching the filepattern, polling it at the given
+ * interval, until the given termination condition is reached. The returned {@link PCollection}
+ * is unbounded.
+ *
+ * <p>This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}.
+ *
+ * @see TerminationCondition
+ */
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public Read watchForNewFiles(Duration pollInterval, TerminationCondition terminationCondition) {
+ return toBuilder()
+ .setWatchForNewFilesInterval(pollInterval)
+ .setWatchForNewFilesTerminationCondition(terminationCondition)
+ .build();
+ }
+
+ /**
* Hints that the filepattern specified in {@link #from(String)} matches a very large number of
* files.
*
@@ -279,20 +332,40 @@ public class TextIO {
return toBuilder().setHintMatchesManyFiles(true).build();
}
+ /**
+ * Configures whether or not a filepattern matching no files is allowed. When using {@link
+ * #watchForNewFiles}, it is always allowed and this parameter is ignored.
+ */
+ public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return toBuilder().setEmptyMatchTreatment(treatment).build();
+ }
+
@Override
public PCollection<String> expand(PBegin input) {
checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
- return getHintMatchesManyFiles()
- ? input
- .apply(
- "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
- .apply(readAll().withCompressionType(getCompressionType()))
- : input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+ if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) {
+ return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+ }
+ // All other cases go through ReadAll.
+ ReadAll readAll =
+ readAll()
+ .withCompressionType(getCompressionType())
+ .withEmptyMatchTreatment(getEmptyMatchTreatment());
+ if (getWatchForNewFilesInterval() != null) {
+ readAll =
+ readAll.watchForNewFiles(
+ getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+ }
+ return input
+ .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+ .apply("Via ReadAll", readAll);
}
// Helper to create a source specific to the requested compression type.
protected FileBasedSource<String> getSource() {
- return wrapWithCompression(new TextSource(getFilepattern()), getCompressionType());
+ return wrapWithCompression(
+ new TextSource(getFilepattern(), getEmptyMatchTreatment()),
+ getCompressionType());
}
private static FileBasedSource<String> wrapWithCompression(
@@ -330,10 +403,17 @@ public class TextIO {
String filepatternDisplay = getFilepattern().isAccessible()
? getFilepattern().get() : getFilepattern().toString();
builder
- .add(DisplayData.item("compressionType", getCompressionType().toString())
- .withLabel("Compression Type"))
- .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
- .withLabel("File Pattern"));
+ .add(
+ DisplayData.item("compressionType", getCompressionType().toString())
+ .withLabel("Compression Type"))
+ .addIfNotNull(
+ DisplayData.item("filePattern", filepatternDisplay).withLabel("File Pattern"))
+ .add(
+ DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+ .withLabel("Treatment of filepatterns that match no files"))
+ .addIfNotNull(
+ DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+ .withLabel("Interval to watch for new files"));
}
}
@@ -344,6 +424,14 @@ public class TextIO {
public abstract static class ReadAll
extends PTransform<PCollection<String>, PCollection<String>> {
abstract CompressionType getCompressionType();
+
+ @Nullable
+ abstract Duration getWatchForNewFilesInterval();
+
+ @Nullable
+ abstract TerminationCondition<String, ?> getWatchForNewFilesTerminationCondition();
+
+ abstract EmptyMatchTreatment getEmptyMatchTreatment();
abstract long getDesiredBundleSizeBytes();
abstract Builder toBuilder();
@@ -351,6 +439,10 @@ public class TextIO {
@AutoValue.Builder
abstract static class Builder {
abstract Builder setCompressionType(CompressionType compressionType);
+ abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+ abstract Builder setWatchForNewFilesTerminationCondition(
+ TerminationCondition<String, ?> condition);
+ abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
abstract ReadAll build();
@@ -361,6 +453,21 @@ public class TextIO {
return toBuilder().setCompressionType(compressionType).build();
}
+ /** Same as {@link Read#withEmptyMatchTreatment}. */
+ public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+ return toBuilder().setEmptyMatchTreatment(treatment).build();
+ }
+
+ /** Same as {@link Read#watchForNewFiles(Duration, TerminationCondition)}. */
+ @Experimental(Kind.SPLITTABLE_DO_FN)
+ public ReadAll watchForNewFiles(
+ Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+ return toBuilder()
+ .setWatchForNewFilesInterval(pollInterval)
+ .setWatchForNewFilesTerminationCondition(terminationCondition)
+ .build();
+ }
+
@VisibleForTesting
ReadAll withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
@@ -368,13 +475,21 @@ public class TextIO {
@Override
public PCollection<String> expand(PCollection<String> input) {
+ Match.Filepatterns matchFilepatterns =
+ Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment());
+ if (getWatchForNewFilesInterval() != null) {
+ matchFilepatterns =
+ matchFilepatterns.continuously(
+ getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+ }
return input
+ .apply(matchFilepatterns)
.apply(
"Read all via FileBasedSource",
new ReadAllViaFileBasedSource<>(
new IsSplittableFn(getCompressionType()),
getDesiredBundleSizeBytes(),
- new CreateTextSourceFn(getCompressionType())))
+ new CreateTextSourceFn(getCompressionType(), getEmptyMatchTreatment())))
.setCoder(StringUtf8Coder.of());
}
@@ -390,15 +505,18 @@ public class TextIO {
private static class CreateTextSourceFn
implements SerializableFunction<String, FileBasedSource<String>> {
private final CompressionType compressionType;
+ private final EmptyMatchTreatment emptyMatchTreatment;
- private CreateTextSourceFn(CompressionType compressionType) {
+ private CreateTextSourceFn(
+ CompressionType compressionType, EmptyMatchTreatment emptyMatchTreatment) {
this.compressionType = compressionType;
+ this.emptyMatchTreatment = emptyMatchTreatment;
}
@Override
public FileBasedSource<String> apply(String input) {
return Read.wrapWithCompression(
- new TextSource(StaticValueProvider.of(input)), compressionType);
+ new TextSource(StaticValueProvider.of(input), emptyMatchTreatment), compressionType);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 37c6263..3e023db 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -524,12 +524,15 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* <li>It must return {@code void}.
* </ul>
*
- * <h2>Splittable DoFn's (WARNING: work in progress, do not use)</h2>
+ * <h2>Splittable DoFn's</h2>
*
* <p>A {@link DoFn} is <i>splittable</i> if its {@link ProcessElement} method has a parameter
* whose type is a subtype of {@link RestrictionTracker}. This is an advanced feature and an
- * overwhelming majority of users will never need to write a splittable {@link DoFn}. Right now
- * the implementation of this feature is in progress and it's not ready for any use.
+ * overwhelming majority of users will never need to write a splittable {@link DoFn}.
+ *
+ * <p>Not all runners support Splittable DoFn. See the
+ * <a href="https://beam.apache.org/documentation/runners/capability-matrix/">capability
+ * matrix</a>.
*
* <p>See <a href="https://s.apache.org/splittable-do-fn">the proposal</a> for an overview of the
* involved concepts (<i>splittable DoFn</i>, <i>restriction</i>, <i>restriction tracker</i>).
@@ -558,8 +561,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* </ul>
*
* <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods.
- *
- * <p>More documentation will be added when the feature becomes ready for general usage.
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index fc6f18d..9da2408 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -38,7 +38,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
-import java.lang.reflect.TypeVariable;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
@@ -64,6 +63,8 @@ import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
@@ -554,14 +555,13 @@ public class Watch {
if (outputCoder == null) {
// If a coder was not specified explicitly, infer it from the OutputT type parameter
// of the PollFn.
- TypeDescriptor<?> superDescriptor =
- TypeDescriptor.of(getPollFn().getClass()).getSupertype(PollFn.class);
- TypeVariable typeVariable = superDescriptor.getTypeParameter("OutputT");
- @SuppressWarnings("unchecked")
- TypeDescriptor<OutputT> descriptor =
- (TypeDescriptor<OutputT>) superDescriptor.resolveType(typeVariable);
+ TypeDescriptor<OutputT> outputT =
+ TypeDescriptors.extractFromTypeParameters(
+ getPollFn(),
+ PollFn.class,
+ new TypeVariableExtractor<PollFn<InputT, OutputT>, OutputT>() {});
try {
- outputCoder = input.getPipeline().getCoderRegistry().getCoder(descriptor);
+ outputCoder = input.getPipeline().getCoderRegistry().getCoder(outputT);
} catch (CannotProvideCoderException e) {
throw new RuntimeException(
"Unable to infer coder for OutputT. Specify it explicitly using withOutputCoder().");
http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index 8ad6030..aa6090d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -25,6 +25,7 @@ import static org.apache.beam.sdk.io.TextIO.CompressionType.DEFLATE;
import static org.apache.beam.sdk.io.TextIO.CompressionType.GZIP;
import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED;
import static org.apache.beam.sdk.io.TextIO.CompressionType.ZIP;
+import static org.apache.beam.sdk.transforms.Watch.Growth.afterTimeSinceNewOutput;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -63,6 +64,7 @@ import java.util.zip.ZipOutputStream;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.TextIO.CompressionType;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
@@ -70,6 +72,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesSplittableParDo;
import org.apache.beam.sdk.testing.ValidatesRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -78,6 +81,7 @@ import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
+import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -787,7 +791,8 @@ public class TextIOReadTest {
private TextSource prepareSource(byte[] data) throws IOException {
Path path = Files.createTempFile(tempFolder, "tempfile", "ext");
Files.write(path, data);
- return new TextSource(ValueProvider.StaticValueProvider.of(path.toString()));
+ return new TextSource(
+ ValueProvider.StaticValueProvider.of(path.toString()), EmptyMatchTreatment.DISALLOW);
}
@Test
@@ -872,4 +877,51 @@ public class TextIOReadTest {
PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE));
p.run();
}
+
+ @Test
+ @Category({NeedsRunner.class, UsesSplittableParDo.class})
+ public void testReadWatchForNewFiles() throws IOException, InterruptedException {
+ final Path basePath = tempFolder.resolve("readWatch");
+ basePath.toFile().mkdir();
+ PCollection<String> lines =
+ p.apply(
+ TextIO.read()
+ .from(basePath.resolve("*").toString())
+ // Make sure that compression type propagates into readAll()
+ .withCompressionType(ZIP)
+ .watchForNewFiles(
+ Duration.millis(100), afterTimeSinceNewOutput(Duration.standardSeconds(3))));
+
+ Thread writer =
+ new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(1000);
+ writeToFile(
+ Arrays.asList("a.1", "a.2"),
+ basePath.resolve("fileA").toString(),
+ CompressionType.ZIP);
+ Thread.sleep(300);
+ writeToFile(
+ Arrays.asList("b.1", "b.2"),
+ basePath.resolve("fileB").toString(),
+ CompressionType.ZIP);
+ Thread.sleep(300);
+ writeToFile(
+ Arrays.asList("c.1", "c.2"),
+ basePath.resolve("fileC").toString(),
+ CompressionType.ZIP);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ writer.start();
+
+ PAssert.that(lines).containsInAnyOrder("a.1", "a.2", "b.1", "b.2", "c.1", "c.2");
+ p.run();
+
+ writer.join();
+ }
}
[3/4] beam git commit: Adds coders for boolean,
ResourceId and Metadata
Posted by jk...@apache.org.
Adds coders for boolean, ResourceId and Metadata
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e43b238
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e43b238
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e43b238
Branch: refs/heads/master
Commit: 5e43b2388652f38a37ab3378a63ae88e6ad53ee3
Parents: 8d337ff
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 14:42:07 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 16:38:23 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/coders/BooleanCoder.java | 59 ++++++++++++++++++
.../apache/beam/sdk/coders/CoderRegistry.java | 10 ++++
.../apache/beam/sdk/io/fs/MetadataCoder.java | 63 ++++++++++++++++++++
.../apache/beam/sdk/io/fs/ResourceIdCoder.java | 56 +++++++++++++++++
.../org/apache/beam/sdk/transforms/Watch.java | 7 ++-
5 files changed, 192 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java
new file mode 100644
index 0000000..e7f7543
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/** A {@link Coder} for {@link Boolean}. */
+public class BooleanCoder extends AtomicCoder<Boolean> {
+ private static final ByteCoder BYTE_CODER = ByteCoder.of();
+
+ private static final BooleanCoder INSTANCE = new BooleanCoder();
+
+ /** Returns the singleton instance of {@link BooleanCoder}. */
+ public static BooleanCoder of() {
+ return INSTANCE;
+ }
+
+ @Override
+ public void encode(Boolean value, OutputStream os) throws IOException {
+ BYTE_CODER.encode(value ? (byte) 1 : 0, os);
+ }
+
+ @Override
+ public Boolean decode(InputStream is) throws IOException {
+ return BYTE_CODER.decode(is) == 1;
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+
+ @Override
+ public boolean isRegisterByteSizeObserverCheap(Boolean value) {
+ return true;
+ }
+
+ @Override
+ protected long getEncodedElementByteSize(Boolean value) throws Exception {
+ return 1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index 48389b1..c335bda 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -43,6 +43,10 @@ import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MetadataCoder;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.fs.ResourceIdCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.util.CoderUtils;
@@ -89,6 +93,8 @@ public class CoderRegistry {
private CommonTypes() {
ImmutableMap.Builder<Class<?>, CoderProvider> builder = ImmutableMap.builder();
+ builder.put(Boolean.class,
+ CoderProviders.fromStaticMethods(Boolean.class, BooleanCoder.class));
builder.put(Byte.class,
CoderProviders.fromStaticMethods(Byte.class, ByteCoder.class));
builder.put(BitSet.class,
@@ -109,6 +115,10 @@ public class CoderRegistry {
CoderProviders.fromStaticMethods(Long.class, VarLongCoder.class));
builder.put(Map.class,
CoderProviders.fromStaticMethods(Map.class, MapCoder.class));
+ builder.put(Metadata.class,
+ CoderProviders.fromStaticMethods(Metadata.class, MetadataCoder.class));
+ builder.put(ResourceId.class,
+ CoderProviders.fromStaticMethods(ResourceId.class, ResourceIdCoder.class));
builder.put(Set.class,
CoderProviders.fromStaticMethods(Set.class, SetCoder.class));
builder.put(String.class,
http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
new file mode 100644
index 0000000..5c9c4d7
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+
+/** A {@link Coder} for {@link Metadata}. */
+public class MetadataCoder extends AtomicCoder<Metadata> {
+ private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of();
+ private static final VarIntCoder INT_CODER = VarIntCoder.of();
+ private static final VarLongCoder LONG_CODER = VarLongCoder.of();
+
+ /** Creates a {@link MetadataCoder}. */
+ public static MetadataCoder of() {
+ return new MetadataCoder();
+ }
+
+ @Override
+ public void encode(Metadata value, OutputStream os) throws IOException {
+ RESOURCE_ID_CODER.encode(value.resourceId(), os);
+ INT_CODER.encode(value.isReadSeekEfficient() ? 1 : 0, os);
+ LONG_CODER.encode(value.sizeBytes(), os);
+ }
+
+ @Override
+ public Metadata decode(InputStream is) throws IOException {
+ ResourceId resourceId = RESOURCE_ID_CODER.decode(is);
+ boolean isReadSeekEfficient = INT_CODER.decode(is) == 1;
+ long sizeBytes = LONG_CODER.decode(is);
+ return Metadata.builder()
+ .setResourceId(resourceId)
+ .setIsReadSeekEfficient(isReadSeekEfficient)
+ .setSizeBytes(sizeBytes)
+ .build();
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java
new file mode 100644
index 0000000..d7649c0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileSystems;
+
+/** A {@link Coder} for {@link ResourceId}. */
+public class ResourceIdCoder extends AtomicCoder<ResourceId> {
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+ private static final Coder<Boolean> BOOL_CODER = BooleanCoder.of();
+
+ /** Creates a {@link ResourceIdCoder}. */
+ public static ResourceIdCoder of() {
+ return new ResourceIdCoder();
+ }
+
+ @Override
+ public void encode(ResourceId value, OutputStream os) throws IOException {
+ STRING_CODER.encode(value.toString(), os);
+ BOOL_CODER.encode(value.isDirectory(), os);
+ }
+
+ @Override
+ public ResourceId decode(InputStream is) throws IOException {
+ String spec = STRING_CODER.decode(is);
+ boolean isDirectory = BOOL_CODER.decode(is);
+ return FileSystems.matchNewResource(spec, isDirectory);
+ }
+
+ @Override
+ public boolean consistentWithEquals() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index b21eb62..fc6f18d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -47,6 +47,7 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DurationCoder;
@@ -958,7 +959,7 @@ public class Watch {
return new GrowthStateCoder<>(outputCoder, terminationStateCoder);
}
- private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+ private static final Coder<Boolean> BOOLEAN_CODER = BooleanCoder.of();
private static final Coder<Instant> INSTANT_CODER = NullableCoder.of(InstantCoder.of());
private static final Coder<HashCode> HASH_CODE_CODER = HashCode128Coder.of();
@@ -980,7 +981,7 @@ public class Watch {
throws IOException {
completedCoder.encode(value.completed, os);
pendingCoder.encode(value.pending, os);
- INT_CODER.encode(value.isOutputComplete ? 1 : 0, os);
+ BOOLEAN_CODER.encode(value.isOutputComplete, os);
terminationStateCoder.encode(value.terminationState, os);
INSTANT_CODER.encode(value.pollWatermark, os);
}
@@ -989,7 +990,7 @@ public class Watch {
public GrowthState<OutputT, TerminationStateT> decode(InputStream is) throws IOException {
Map<HashCode, Instant> completed = completedCoder.decode(is);
List<TimestampedValue<OutputT>> pending = pendingCoder.decode(is);
- boolean isOutputComplete = (INT_CODER.decode(is) == 1);
+ boolean isOutputComplete = BOOLEAN_CODER.decode(is);
TerminationStateT terminationState = terminationStateCoder.decode(is);
Instant pollWatermark = INSTANT_CODER.decode(is);
return new GrowthState<>(
[4/4] beam git commit: This closes #3607: [BEAM-2512] Introduces
TextIO.watchForNewFiles() and the Match transform
Posted by jk...@apache.org.
This closes #3607: [BEAM-2512] Introduces TextIO.watchForNewFiles() and the Match transform
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/df36bd9d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/df36bd9d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/df36bd9d
Branch: refs/heads/master
Commit: df36bd9d7979bd6956ec299a6f63d5a0237d031a
Parents: 8d337ff fe002c2
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Aug 4 16:38:39 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 16:38:39 2017 -0700
----------------------------------------------------------------------
.../beam/sdk/annotations/Experimental.java | 5 +-
.../apache/beam/sdk/coders/BooleanCoder.java | 59 +++++++
.../apache/beam/sdk/coders/CoderRegistry.java | 10 ++
.../java/org/apache/beam/sdk/io/AvroIO.java | 2 +
.../org/apache/beam/sdk/io/FileBasedSource.java | 52 ++++---
.../org/apache/beam/sdk/io/FileSystems.java | 46 ++++++
.../main/java/org/apache/beam/sdk/io/Match.java | 156 +++++++++++++++++++
.../beam/sdk/io/ReadAllViaFileBasedSource.java | 46 +++---
.../java/org/apache/beam/sdk/io/TextIO.java | 156 ++++++++++++++++---
.../java/org/apache/beam/sdk/io/TextSource.java | 7 +-
.../beam/sdk/io/fs/EmptyMatchTreatment.java | 46 ++++++
.../org/apache/beam/sdk/io/fs/MatchResult.java | 5 +-
.../apache/beam/sdk/io/fs/MetadataCoder.java | 63 ++++++++
.../apache/beam/sdk/io/fs/ResourceIdCoder.java | 56 +++++++
.../org/apache/beam/sdk/transforms/DoFn.java | 11 +-
.../org/apache/beam/sdk/transforms/Watch.java | 23 +--
.../apache/beam/sdk/io/FileBasedSourceTest.java | 51 ++++++
.../org/apache/beam/sdk/io/TextIOReadTest.java | 54 ++++++-
18 files changed, 756 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
[2/4] beam git commit: Introduces EmptyMatchTreatment parameter to
FileSystems.match()
Posted by jk...@apache.org.
Introduces EmptyMatchTreatment parameter to FileSystems.match()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/db9aede2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/db9aede2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/db9aede2
Branch: refs/heads/master
Commit: db9aede289f8546bb30113353f07aa75daa83eba
Parents: 5e43b23
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 14:43:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 16:38:23 2017 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/io/FileBasedSource.java | 52 ++++++++++----------
.../org/apache/beam/sdk/io/FileSystems.java | 46 +++++++++++++++++
.../java/org/apache/beam/sdk/io/TextSource.java | 7 ++-
.../beam/sdk/io/fs/EmptyMatchTreatment.java | 46 +++++++++++++++++
.../org/apache/beam/sdk/io/fs/MatchResult.java | 5 +-
.../apache/beam/sdk/io/FileBasedSourceTest.java | 51 +++++++++++++++++++
6 files changed, 180 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index d4413c9..7f865de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -23,19 +23,17 @@ import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -68,6 +66,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
private final ValueProvider<String> fileOrPatternSpec;
+ private final EmptyMatchTreatment emptyMatchTreatment;
@Nullable private MatchResult.Metadata singleFileMetadata;
private final Mode mode;
@@ -80,15 +79,28 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
}
/**
- * Create a {@code FileBaseSource} based on a file or a file pattern specification.
+ * Create a {@code FileBaseSource} based on a file or a file pattern specification, with the given
+ * strategy for treating filepatterns that do not match any files.
*/
- protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
+ protected FileBasedSource(
+ ValueProvider<String> fileOrPatternSpec,
+ EmptyMatchTreatment emptyMatchTreatment,
+ long minBundleSize) {
super(0, Long.MAX_VALUE, minBundleSize);
- mode = Mode.FILEPATTERN;
+ this.mode = Mode.FILEPATTERN;
+ this.emptyMatchTreatment = emptyMatchTreatment;
this.fileOrPatternSpec = fileOrPatternSpec;
}
/**
+ * Like {@link #FileBasedSource(ValueProvider, EmptyMatchTreatment, long)}, but uses the default
+ * value of {@link EmptyMatchTreatment#DISALLOW}.
+ */
+ protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
+ this(fileOrPatternSpec, EmptyMatchTreatment.DISALLOW, minBundleSize);
+ }
+
+ /**
* Create a {@code FileBasedSource} based on a single file. This constructor must be used when
* creating a new {@code FileBasedSource} for a subrange of a single file.
* Additionally, this constructor must be used to create new {@code FileBasedSource}s when
@@ -110,6 +122,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
mode = Mode.SINGLE_FILE_OR_SUBRANGE;
this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata");
this.fileOrPatternSpec = StaticValueProvider.of(fileMetadata.resourceId().toString());
+
+ // This field will be unused in this mode.
+ this.emptyMatchTreatment = null;
}
/**
@@ -204,14 +219,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
if (mode == Mode.FILEPATTERN) {
long totalSize = 0;
- List<MatchResult> inputs = FileSystems.match(Collections.singletonList(fileOrPattern));
- MatchResult result = Iterables.getOnlyElement(inputs);
- checkArgument(
- result.status() == Status.OK,
- "Error matching the pattern or glob %s: status %s",
- fileOrPattern,
- result.status());
- List<Metadata> allMatches = result.metadata();
+ List<Metadata> allMatches = FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
for (Metadata metadata : allMatches) {
totalSize += metadata.sizeBytes();
}
@@ -254,9 +262,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
if (mode == Mode.FILEPATTERN) {
long startTime = System.currentTimeMillis();
- List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPattern);
- checkArgument(!expandedFiles.isEmpty(),
- "Unable to find any files matching %s", fileOrPattern);
+ List<Metadata> expandedFiles =
+ FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
List<FileBasedSource<T>> splitResults = new ArrayList<>(expandedFiles.size());
for (Metadata metadata : expandedFiles) {
FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
@@ -327,7 +334,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
if (mode == Mode.FILEPATTERN) {
long startTime = System.currentTimeMillis();
- List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPattern);
+ List<Metadata> fileMetadata =
+ FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
+ LOG.info("Matched {} files for pattern {}", fileMetadata.size(), fileOrPattern);
List<FileBasedReader<T>> fileReaders = new ArrayList<>();
for (Metadata metadata : fileMetadata) {
long endOffset = metadata.sizeBytes();
@@ -389,13 +398,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
return metadata.sizeBytes();
}
- private static List<Metadata> expandFilePattern(String fileOrPatternSpec) throws IOException {
- MatchResult matches =
- Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(fileOrPatternSpec)));
- LOG.info("Matched {} files for pattern {}", matches.metadata().size(), fileOrPatternSpec);
- return ImmutableList.copyOf(matches.metadata());
- }
-
/**
* A {@link Source.Reader reader} that implements code common to readers of
* {@code FileBasedSource}s.
http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index bd4668f..96394b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MatchResult.Status;
@@ -72,6 +73,8 @@ public class FileSystems {
public static final String DEFAULT_SCHEME = "file";
private static final Pattern FILE_SCHEME_PATTERN =
Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):.*");
+ private static final Pattern GLOB_PATTERN =
+ Pattern.compile("[*?{}]");
private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM =
new AtomicReference<Map<String, FileSystem>>(
@@ -79,6 +82,11 @@ public class FileSystems {
/********************************** METHODS FOR CLIENT **********************************/
+ /** Checks whether the given spec contains a glob wildcard character. */
+ public static boolean hasGlobWildcard(String spec) {
+ return GLOB_PATTERN.matcher(spec).find();
+ }
+
/**
* This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}.
* Callers should use {@link #match} to resolve users specs ambiguities before
@@ -102,6 +110,9 @@ public class FileSystems {
* <p>In case the spec schemes don't match any known {@link FileSystem} implementations,
* FileSystems will attempt to use {@link LocalFileSystem} to resolve a path.
*
+ * <p>Specs that do not match any resources are treated according to
+ * {@link EmptyMatchTreatment#DISALLOW}.
+ *
* @return {@code List<MatchResult>} in the same order of the input specs.
*
* @throws IllegalArgumentException if specs are invalid -- empty or have different schemes.
@@ -114,6 +125,17 @@ public class FileSystems {
return getFileSystemInternal(getOnlyScheme(specs)).match(specs);
}
+ /** Like {@link #match(List)}, but with a configurable {@link EmptyMatchTreatment}. */
+ public static List<MatchResult> match(List<String> specs, EmptyMatchTreatment emptyMatchTreatment)
+ throws IOException {
+ List<MatchResult> matches = getFileSystemInternal(getOnlyScheme(specs)).match(specs);
+ List<MatchResult> res = Lists.newArrayListWithExpectedSize(matches.size());
+ for (int i = 0; i < matches.size(); i++) {
+ res.add(maybeAdjustEmptyMatchResult(specs.get(i), matches.get(i), emptyMatchTreatment));
+ }
+ return res;
+ }
+
/**
* Like {@link #match(List)}, but for a single resource specification.
@@ -130,6 +152,30 @@ public class FileSystems {
matches);
return matches.get(0);
}
+
+ /** Like {@link #match(String)}, but with a configurable {@link EmptyMatchTreatment}. */
+ public static MatchResult match(String spec, EmptyMatchTreatment emptyMatchTreatment)
+ throws IOException {
+ MatchResult res = match(spec);
+ return maybeAdjustEmptyMatchResult(spec, res, emptyMatchTreatment);
+ }
+
+ private static MatchResult maybeAdjustEmptyMatchResult(
+ String spec, MatchResult res, EmptyMatchTreatment emptyMatchTreatment)
+ throws IOException {
+ if (res.status() != Status.NOT_FOUND) {
+ return res;
+ }
+ boolean notFoundAllowed =
+ emptyMatchTreatment == EmptyMatchTreatment.ALLOW
+ || (FileSystems.hasGlobWildcard(spec)
+ && emptyMatchTreatment == EmptyMatchTreatment.ALLOW_IF_WILDCARD);
+ if (notFoundAllowed) {
+ return MatchResult.create(Status.OK, Collections.<Metadata>emptyList());
+ }
+ return res;
+ }
+
/**
* Returns the {@link Metadata} for a single file resource. Expects a resource specification
* {@code spec} that matches a single result.
http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index 86c73d5..29188dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -28,6 +28,7 @@ import java.nio.channels.SeekableByteChannel;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
@@ -48,7 +49,11 @@ import org.apache.beam.sdk.options.ValueProvider;
@VisibleForTesting
class TextSource extends FileBasedSource<String> {
TextSource(ValueProvider<String> fileSpec) {
- super(fileSpec, 1L);
+ this(fileSpec, EmptyMatchTreatment.DISALLOW);
+ }
+
+ TextSource(ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment) {
+ super(fileSpec, emptyMatchTreatment, 1L);
}
private TextSource(MatchResult.Metadata metadata, long start, long end) {
http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java
new file mode 100644
index 0000000..8e12993
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+
+/**
+ * Options for allowing or disallowing filepatterns that match no resources in {@link
+ * org.apache.beam.sdk.io.FileSystems#match}.
+ */
+public enum EmptyMatchTreatment {
+ /**
+ * Filepatterns matching no resources are allowed. For such a filepattern, {@link
+ * MatchResult#status} will be {@link Status#OK} and {@link MatchResult#metadata} will return an
+ * empty list.
+ */
+ ALLOW,
+
+ /**
+ * Filepatterns matching no resources are disallowed. For such a filepattern, {@link
+ * MatchResult#status} will be {@link Status#NOT_FOUND} and {@link MatchResult#metadata} will
+ * throw a {@link java.io.FileNotFoundException}.
+ */
+ DISALLOW,
+
+ /**
+ * Filepatterns matching no resources are allowed if the filepattern contains a glob wildcard
+ * character, and disallowed otherwise (i.e. if the filepattern specifies a single file).
+ */
+ ALLOW_IF_WILDCARD
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
index 642c049..aa80b96 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
@@ -21,6 +21,7 @@ import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
+import org.apache.beam.sdk.io.FileSystems;
/**
* The result of {@link org.apache.beam.sdk.io.FileSystem#match}.
@@ -78,7 +79,9 @@ public abstract class MatchResult {
public abstract Status status();
/**
- * {@link Metadata} of matched files.
+ * {@link Metadata} of matched files. Note that if {@link #status()} is {@link Status#NOT_FOUND},
+ * this may either throw a {@link java.io.FileNotFoundException} or return an empty list,
+ * depending on the {@link EmptyMatchTreatment} used in the {@link FileSystems#match} call.
*/
public abstract List<Metadata> metadata() throws IOException;
http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index 1bdb915..ea9e06b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
import org.apache.beam.sdk.io.Source.Reader;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -94,6 +95,15 @@ public class FileBasedSourceTest {
}
public TestFileBasedSource(
+ String fileOrPattern,
+ EmptyMatchTreatment emptyMatchTreatment,
+ long minBundleSize,
+ String splitHeader) {
+ super(StaticValueProvider.of(fileOrPattern), emptyMatchTreatment, minBundleSize);
+ this.splitHeader = splitHeader;
+ }
+
+ public TestFileBasedSource(
Metadata fileOrPattern,
long minBundleSize,
long startOffset,
@@ -371,6 +381,47 @@ public class FileBasedSourceTest {
}
@Test
+ public void testEmptyFilepatternTreatmentDefaultDisallow() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ TestFileBasedSource source =
+ new TestFileBasedSource(new File(tempFolder.getRoot(), "doesNotExist").getPath(), 64, null);
+ thrown.expect(FileNotFoundException.class);
+ readFromSource(source, options);
+ }
+
+ @Test
+ public void testEmptyFilepatternTreatmentAllow() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ TestFileBasedSource source =
+ new TestFileBasedSource(
+ new File(tempFolder.getRoot(), "doesNotExist").getPath(),
+ EmptyMatchTreatment.ALLOW,
+ 64,
+ null);
+ TestFileBasedSource sourceWithWildcard =
+ new TestFileBasedSource(
+ new File(tempFolder.getRoot(), "doesNotExist*").getPath(),
+ EmptyMatchTreatment.ALLOW_IF_WILDCARD,
+ 64,
+ null);
+ assertEquals(0, readFromSource(source, options).size());
+ assertEquals(0, readFromSource(sourceWithWildcard, options).size());
+ }
+
+ @Test
+ public void testEmptyFilepatternTreatmentAllowIfWildcard() throws IOException {
+ PipelineOptions options = PipelineOptionsFactory.create();
+ TestFileBasedSource source =
+ new TestFileBasedSource(
+ new File(tempFolder.getRoot(), "doesNotExist").getPath(),
+ EmptyMatchTreatment.ALLOW_IF_WILDCARD,
+ 64,
+ null);
+ thrown.expect(FileNotFoundException.class);
+ readFromSource(source, options);
+ }
+
+ @Test
public void testCloseUnstartedFilePatternReader() throws IOException {
PipelineOptions options = PipelineOptionsFactory.create();
List<String> data1 = createStringDataset(3, 50);