You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/04 23:44:15 UTC

[1/4] beam git commit: [BEAM-2512] Introduces TextIO.watchForNewFiles() and Match

Repository: beam
Updated Branches:
  refs/heads/master 8d337ff0e -> df36bd9d7


[BEAM-2512] Introduces TextIO.watchForNewFiles() and Match

Part of http://s.apache.org/textio-sdf, based on
http://s.apache.org/beam-watch-transform.

The Match transform can be useful for users who want to write their own
file-based connectors, or for advanced use cases such as: watch for new
subdirectories to appear in a directory (using Match), and then start
watching each subdirectory for new files and reading them
(using TextIO.watchForNewFiles()).

Additionally, finally makes it configurable whether TextIO.read/readAll()
allow filepatterns matching no files.

Normal reads disallow empty filepatterns (to preserve old behavior), readAll()
allows them if the filepattern contains a wildcard (which seems a reasonable
default behavior that read() should have had from the beginning, but we can't
change it), and watchForNewFiles() allows them unconditionally (because files
might appear later).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fe002c22
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fe002c22
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fe002c22

Branch: refs/heads/master
Commit: fe002c221602a543b99afd6db910a7a60b259fa4
Parents: db9aede
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 14:44:35 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 16:38:23 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/annotations/Experimental.java      |   5 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     |   2 +
 .../main/java/org/apache/beam/sdk/io/Match.java | 156 +++++++++++++++++++
 .../beam/sdk/io/ReadAllViaFileBasedSource.java  |  46 +++---
 .../java/org/apache/beam/sdk/io/TextIO.java     | 156 ++++++++++++++++---
 .../org/apache/beam/sdk/transforms/DoFn.java    |  11 +-
 .../org/apache/beam/sdk/transforms/Watch.java   |  16 +-
 .../org/apache/beam/sdk/io/TextIOReadTest.java  |  54 ++++++-
 8 files changed, 384 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
index 8224ebb..80c4613 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/annotations/Experimental.java
@@ -72,8 +72,9 @@ public @interface Experimental {
     OUTPUT_TIME,
 
     /**
-     * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>.
-     * Do not use: API is unstable and runner support is incomplete.
+     * <a href="https://s.apache.org/splittable-do-fn">Splittable DoFn</a>. See <a
+     * href="https://beam.apache.org/documentation/runners/capability-matrix/">capability matrix</a>
+     * for runner support.
      */
     SPLITTABLE_DO_FN,
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index cd5857c..653b806 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -478,6 +478,7 @@ public class AvroIO {
     public PCollection<T> expand(PCollection<String> input) {
       checkNotNull(getSchema(), "schema");
       return input
+          .apply(Match.filepatterns())
           .apply(
               "Read all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
@@ -632,6 +633,7 @@ public class AvroIO {
             }
           };
       return input
+          .apply(Match.filepatterns())
           .apply(
               "Parse all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java
new file mode 100644
index 0000000..bb44fac
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Match.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollResult;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Matches each filepattern in a collection of filepatterns using {@link FileSystems#match}, and
+ * produces a collection of matched resources (both files and directories) as {@link Metadata}.
+ * Resources are not deduplicated between filepatterns, i.e. if the same resource matches multiple
+ * filepatterns, it will be produced multiple times.
+ *
+ * <p>By default, this transform matches each filepattern once and produces a bounded {@link
+ * PCollection}. To continuously watch each filepattern for new matches, use {@link
+ * Filepatterns#continuously(Duration, TerminationCondition)} - this will produce an unbounded
+ * {@link PCollection}.
+ *
+ * <p>By default, filepatterns matching no resources are treated according to {@link
+ * EmptyMatchTreatment#ALLOW_IF_WILDCARD}. To configure this behavior, use {@link
+ * Filepatterns#withEmptyMatchTreatment}.
+ */
+public class Match {
+  private static final Logger LOG = LoggerFactory.getLogger(Match.class);
+
+  /** See {@link Match}. */
+  public static Filepatterns filepatterns() {
+    return new AutoValue_Match_Filepatterns.Builder()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
+        .build();
+  }
+
+  /** Implementation of {@link #filepatterns}. */
+  @AutoValue
+  public abstract static class Filepatterns
+      extends PTransform<PCollection<String>, PCollection<Metadata>> {
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
+
+    @Nullable
+    abstract Duration getWatchInterval();
+
+    @Nullable
+    abstract TerminationCondition<String, ?> getWatchTerminationCondition();
+
+    abstract Builder toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
+
+      abstract Builder setWatchInterval(Duration watchInterval);
+
+      abstract Builder setWatchTerminationCondition(TerminationCondition<String, ?> condition);
+
+      abstract Filepatterns build();
+    }
+
+    /**
+     * Sets whether or not filepatterns matching no files are allowed. When using {@link
+     * #continuously}, they are always allowed, and this parameter is ignored.
+     */
+    public Filepatterns withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    /**
+     * Continuously watches for new resources matching the filepattern, repeatedly matching it at
+     * the given interval, until the given termination condition is reached. The returned {@link
+     * PCollection} is unbounded.
+     *
+     * <p>This works only in runners supporting {@link Experimental.Kind#SPLITTABLE_DO_FN}.
+     *
+     * @see TerminationCondition
+     */
+    @Experimental(Experimental.Kind.SPLITTABLE_DO_FN)
+    public Filepatterns continuously(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return toBuilder()
+          .setWatchInterval(pollInterval)
+          .setWatchTerminationCondition(terminationCondition)
+          .build();
+    }
+
+    @Override
+    public PCollection<Metadata> expand(PCollection<String> input) {
+      if (getWatchInterval() == null) {
+        return input.apply("Match filepatterns", ParDo.of(new MatchFn(getEmptyMatchTreatment())));
+      } else {
+        return input
+            .apply(
+                "Continuously match filepatterns",
+                Watch.growthOf(new MatchPollFn())
+                    .withPollInterval(getWatchInterval())
+                    .withTerminationPerInput(getWatchTerminationCondition()))
+            .apply(Values.<Metadata>create());
+      }
+    }
+
+    private static class MatchFn extends DoFn<String, Metadata> {
+      private final EmptyMatchTreatment emptyMatchTreatment;
+
+      public MatchFn(EmptyMatchTreatment emptyMatchTreatment) {
+        this.emptyMatchTreatment = emptyMatchTreatment;
+      }
+
+      @ProcessElement
+      public void process(ProcessContext c) throws Exception {
+        String filepattern = c.element();
+        MatchResult match = FileSystems.match(filepattern, emptyMatchTreatment);
+        LOG.info("Matched {} files for pattern {}", match.metadata().size(), filepattern);
+        for (Metadata metadata : match.metadata()) {
+          c.output(metadata);
+        }
+      }
+    }
+
+    private static class MatchPollFn implements Watch.Growth.PollFn<String, Metadata> {
+      @Override
+      public PollResult<Metadata> apply(String input, Instant timestamp) throws Exception {
+        return PollResult.incomplete(
+            Instant.now(), FileSystems.match(input, EmptyMatchTreatment.ALLOW).metadata());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
index 66aa41e..990f508 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ReadAllViaFileBasedSource.java
@@ -21,7 +21,8 @@ import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.IOException;
 import java.util.concurrent.ThreadLocalRandom;
-import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -33,10 +34,14 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 
 /**
- * Reads each filepattern in the input {@link PCollection} using given parameters for splitting
- * files into offset ranges and for creating a {@link FileBasedSource} for a file.
+ * Reads each file in the input {@link PCollection} of {@link Metadata} using given parameters for
+ * splitting files into offset ranges and for creating a {@link FileBasedSource} for a file. The
+ * input {@link PCollection} must not contain {@link ResourceId#isDirectory directories}.
+ *
+ * <p>To obtain the collection of {@link Metadata} from a filepattern, use {@link
+ * Match#filepatterns()}.
  */
-class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PCollection<T>> {
+class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<Metadata>, PCollection<T>> {
   private final SerializableFunction<String, Boolean> isSplittable;
   private final long desiredBundleSizeBytes;
   private final SerializableFunction<String, FileBasedSource<T>> createSource;
@@ -51,13 +56,12 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl
   }
 
   @Override
-  public PCollection<T> expand(PCollection<String> input) {
+  public PCollection<T> expand(PCollection<Metadata> input) {
     return input
-        .apply("Expand glob", ParDo.of(new ExpandGlobFn()))
         .apply(
             "Split into ranges",
             ParDo.of(new SplitIntoRangesFn(isSplittable, desiredBundleSizeBytes)))
-        .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<MatchResult.Metadata, OffsetRange>>())
+        .apply("Reshuffle", new ReshuffleWithUniqueKey<KV<Metadata, OffsetRange>>())
         .apply("Read ranges", ParDo.of(new ReadFileRangesFn<T>(createSource)));
   }
 
@@ -86,23 +90,7 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl
     }
   }
 
-  private static class ExpandGlobFn extends DoFn<String, MatchResult.Metadata> {
-    @ProcessElement
-    public void process(ProcessContext c) throws Exception {
-      MatchResult match = FileSystems.match(c.element());
-      checkArgument(
-          match.status().equals(MatchResult.Status.OK),
-          "Failed to match filepattern %s: %s",
-          c.element(),
-          match.status());
-      for (MatchResult.Metadata metadata : match.metadata()) {
-        c.output(metadata);
-      }
-    }
-  }
-
-  private static class SplitIntoRangesFn
-      extends DoFn<MatchResult.Metadata, KV<MatchResult.Metadata, OffsetRange>> {
+  private static class SplitIntoRangesFn extends DoFn<Metadata, KV<Metadata, OffsetRange>> {
     private final SerializableFunction<String, Boolean> isSplittable;
     private final long desiredBundleSizeBytes;
 
@@ -114,7 +102,11 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl
 
     @ProcessElement
     public void process(ProcessContext c) {
-      MatchResult.Metadata metadata = c.element();
+      Metadata metadata = c.element();
+      checkArgument(
+          !metadata.resourceId().isDirectory(),
+          "Resource %s is a directory",
+          metadata.resourceId());
       if (!metadata.isReadSeekEfficient()
           || !isSplittable.apply(metadata.resourceId().toString())) {
         c.output(KV.of(metadata, new OffsetRange(0, metadata.sizeBytes())));
@@ -127,7 +119,7 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl
     }
   }
 
-  private static class ReadFileRangesFn<T> extends DoFn<KV<MatchResult.Metadata, OffsetRange>, T> {
+  private static class ReadFileRangesFn<T> extends DoFn<KV<Metadata, OffsetRange>, T> {
     private final SerializableFunction<String, FileBasedSource<T>> createSource;
 
     private ReadFileRangesFn(SerializableFunction<String, FileBasedSource<T>> createSource) {
@@ -136,7 +128,7 @@ class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<String>, PColl
 
     @ProcessElement
     public void process(ProcessContext c) throws IOException {
-      MatchResult.Metadata metadata = c.element().getKey();
+      Metadata metadata = c.element().getKey();
       OffsetRange range = c.element().getValue();
       FileBasedSource<T> source = createSource.apply(metadata.toString());
       try (BoundedSource.BoundedReader<T> reader =

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 9a14ad9..612f5c5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
 import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -44,10 +45,12 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
+import org.joda.time.Duration;
 
 /**
  * {@link PTransform}s for reading and writing text files.
@@ -57,9 +60,16 @@ import org.apache.beam.sdk.values.PDone;
  * file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link
  * PCollection}, apply {@link TextIO#readAll()}.
  *
- * <p>{@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, each
- * corresponding to one line of an input UTF-8 text file (split into lines delimited by '\n', '\r',
- * or '\r\n').
+ * <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to
+ * one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n').
+ *
+ * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} and {@link
+ * ReadAll#watchForNewFiles} allow streaming of new files matching the filepattern(s).
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link #readAll}
+ * allows them in case the filepattern contains a glob wildcard character. Use {@link
+ * TextIO.Read#withEmptyMatchTreatment} and {@link TextIO.ReadAll#withEmptyMatchTreatment} to
+ * configure this behavior.
  *
  * <p>Example 1: reading a file or filepattern.
  *
@@ -88,6 +98,20 @@ import org.apache.beam.sdk.values.PDone;
  * PCollection<String> lines = filenames.apply(TextIO.readAll());
  * }</pre>
  *
+ * <p>Example 3: streaming new files matching a filepattern.
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<String> lines = p.apply(TextIO.read()
+ *     .from("/local/path/to/files/*")
+ *     .watchForNewFiles(
+ *       // Check for new files every minute
+ *       Duration.standardMinutes(1),
+ *       // Stop watching the filepattern if no new files appear within an hour
+ *       afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
  * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
  * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
  *
@@ -153,6 +177,7 @@ public class TextIO {
     return new AutoValue_TextIO_Read.Builder()
         .setCompressionType(CompressionType.AUTO)
         .setHintMatchesManyFiles(false)
+        .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
         .build();
   }
 
@@ -173,6 +198,7 @@ public class TextIO {
         // but is not so large as to exhaust a typical runner's maximum amount of output per
         // ProcessElement call.
         .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
+        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
         .build();
   }
 
@@ -219,7 +245,15 @@ public class TextIO {
   public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
     @Nullable abstract ValueProvider<String> getFilepattern();
     abstract CompressionType getCompressionType();
+
+    @Nullable
+    abstract Duration getWatchForNewFilesInterval();
+
+    @Nullable
+    abstract TerminationCondition getWatchForNewFilesTerminationCondition();
+
     abstract boolean getHintMatchesManyFiles();
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
 
     abstract Builder toBuilder();
 
@@ -227,7 +261,10 @@ public class TextIO {
     abstract static class Builder {
       abstract Builder setFilepattern(ValueProvider<String> filepattern);
       abstract Builder setCompressionType(CompressionType compressionType);
+      abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+      abstract Builder setWatchForNewFilesTerminationCondition(TerminationCondition condition);
       abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
 
       abstract Read build();
     }
@@ -257,8 +294,7 @@ public class TextIO {
     }
 
     /**
-     * Returns a new transform for reading from text files that's like this one but
-     * reads from input sources using the specified compression type.
+     * Reads from input sources using the specified compression type.
      *
      * <p>If no compression type is specified, the default is {@link TextIO.CompressionType#AUTO}.
      */
@@ -267,6 +303,23 @@ public class TextIO {
     }
 
     /**
+     * Continuously watches for new files matching the filepattern, polling it at the given
+     * interval, until the given termination condition is reached. The returned {@link PCollection}
+     * is unbounded.
+     *
+     * <p>This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}.
+     *
+     * @see TerminationCondition
+     */
+    @Experimental(Kind.SPLITTABLE_DO_FN)
+    public Read watchForNewFiles(Duration pollInterval, TerminationCondition terminationCondition) {
+      return toBuilder()
+          .setWatchForNewFilesInterval(pollInterval)
+          .setWatchForNewFilesTerminationCondition(terminationCondition)
+          .build();
+    }
+
+    /**
      * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
      * files.
      *
@@ -279,20 +332,40 @@ public class TextIO {
       return toBuilder().setHintMatchesManyFiles(true).build();
     }
 
+    /**
+     * Configures whether or not a filepattern matching no files is allowed. When using {@link
+     * #watchForNewFiles}, it is always allowed and this parameter is ignored.
+     */
+    public Read withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
     @Override
     public PCollection<String> expand(PBegin input) {
       checkNotNull(getFilepattern(), "need to set the filepattern of a TextIO.Read transform");
-      return getHintMatchesManyFiles()
-          ? input
-              .apply(
-                  "Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-              .apply(readAll().withCompressionType(getCompressionType()))
-          : input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+      if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) {
+        return input.apply("Read", org.apache.beam.sdk.io.Read.from(getSource()));
+      }
+      // All other cases go through ReadAll.
+      ReadAll readAll =
+          readAll()
+              .withCompressionType(getCompressionType())
+              .withEmptyMatchTreatment(getEmptyMatchTreatment());
+      if (getWatchForNewFilesInterval() != null) {
+        readAll =
+            readAll.watchForNewFiles(
+                getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+      }
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+          .apply("Via ReadAll", readAll);
     }
 
     // Helper to create a source specific to the requested compression type.
     protected FileBasedSource<String> getSource() {
-      return wrapWithCompression(new TextSource(getFilepattern()), getCompressionType());
+      return wrapWithCompression(
+          new TextSource(getFilepattern(), getEmptyMatchTreatment()),
+          getCompressionType());
     }
 
     private static FileBasedSource<String> wrapWithCompression(
@@ -330,10 +403,17 @@ public class TextIO {
       String filepatternDisplay = getFilepattern().isAccessible()
         ? getFilepattern().get() : getFilepattern().toString();
       builder
-          .add(DisplayData.item("compressionType", getCompressionType().toString())
-            .withLabel("Compression Type"))
-          .addIfNotNull(DisplayData.item("filePattern", filepatternDisplay)
-            .withLabel("File Pattern"));
+          .add(
+              DisplayData.item("compressionType", getCompressionType().toString())
+                  .withLabel("Compression Type"))
+          .addIfNotNull(
+              DisplayData.item("filePattern", filepatternDisplay).withLabel("File Pattern"))
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+                  .withLabel("Interval to watch for new files"));
     }
   }
 
@@ -344,6 +424,14 @@ public class TextIO {
   public abstract static class ReadAll
       extends PTransform<PCollection<String>, PCollection<String>> {
     abstract CompressionType getCompressionType();
+
+    @Nullable
+    abstract Duration getWatchForNewFilesInterval();
+
+    @Nullable
+    abstract TerminationCondition<String, ?> getWatchForNewFilesTerminationCondition();
+
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
     abstract long getDesiredBundleSizeBytes();
 
     abstract Builder toBuilder();
@@ -351,6 +439,10 @@ public class TextIO {
     @AutoValue.Builder
     abstract static class Builder {
       abstract Builder setCompressionType(CompressionType compressionType);
+      abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+      abstract Builder setWatchForNewFilesTerminationCondition(
+          TerminationCondition<String, ?> condition);
+      abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
       abstract Builder setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
 
       abstract ReadAll build();
@@ -361,6 +453,21 @@ public class TextIO {
       return toBuilder().setCompressionType(compressionType).build();
     }
 
+    /** Same as {@link Read#withEmptyMatchTreatment}. */
+    public ReadAll withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    /** Same as {@link Read#watchForNewFiles(Duration, TerminationCondition)}. */
+    @Experimental(Kind.SPLITTABLE_DO_FN)
+    public ReadAll watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return toBuilder()
+          .setWatchForNewFilesInterval(pollInterval)
+          .setWatchForNewFilesTerminationCondition(terminationCondition)
+          .build();
+    }
+
     @VisibleForTesting
     ReadAll withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
       return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
@@ -368,13 +475,21 @@ public class TextIO {
 
     @Override
     public PCollection<String> expand(PCollection<String> input) {
+      Match.Filepatterns matchFilepatterns =
+          Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment());
+      if (getWatchForNewFilesInterval() != null) {
+        matchFilepatterns =
+            matchFilepatterns.continuously(
+                getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+      }
       return input
+          .apply(matchFilepatterns)
           .apply(
               "Read all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
                   new IsSplittableFn(getCompressionType()),
                   getDesiredBundleSizeBytes(),
-                  new CreateTextSourceFn(getCompressionType())))
+                  new CreateTextSourceFn(getCompressionType(), getEmptyMatchTreatment())))
           .setCoder(StringUtf8Coder.of());
     }
 
@@ -390,15 +505,18 @@ public class TextIO {
     private static class CreateTextSourceFn
         implements SerializableFunction<String, FileBasedSource<String>> {
       private final CompressionType compressionType;
+      private final EmptyMatchTreatment emptyMatchTreatment;
 
-      private CreateTextSourceFn(CompressionType compressionType) {
+      private CreateTextSourceFn(
+          CompressionType compressionType, EmptyMatchTreatment emptyMatchTreatment) {
         this.compressionType = compressionType;
+        this.emptyMatchTreatment = emptyMatchTreatment;
       }
 
       @Override
       public FileBasedSource<String> apply(String input) {
         return Read.wrapWithCompression(
-            new TextSource(StaticValueProvider.of(input)), compressionType);
+            new TextSource(StaticValueProvider.of(input), emptyMatchTreatment), compressionType);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 37c6263..3e023db 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -524,12 +524,15 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    * <li>It must return {@code void}.
    * </ul>
    *
-   * <h2>Splittable DoFn's (WARNING: work in progress, do not use)</h2>
+   * <h2>Splittable DoFn's</h2>
    *
    * <p>A {@link DoFn} is <i>splittable</i> if its {@link ProcessElement} method has a parameter
    * whose type is a subtype of {@link RestrictionTracker}. This is an advanced feature and an
-   * overwhelming majority of users will never need to write a splittable {@link DoFn}. Right now
-   * the implementation of this feature is in progress and it's not ready for any use.
+   * overwhelming majority of users will never need to write a splittable {@link DoFn}.
+   *
+   * <p>Not all runners support Splittable DoFn. See the
+   * <a href="https://beam.apache.org/documentation/runners/capability-matrix/">capability
+   * matrix</a>.
    *
    * <p>See <a href="https://s.apache.org/splittable-do-fn">the proposal</a> for an overview of the
    * involved concepts (<i>splittable DoFn</i>, <i>restriction</i>, <i>restriction tracker</i>).
@@ -558,8 +561,6 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    * </ul>
    *
    * <p>A non-splittable {@link DoFn} <i>must not</i> define any of these methods.
-   *
-   * <p>More documentation will be added when the feature becomes ready for general usage.
    */
   @Documented
   @Retention(RetentionPolicy.RUNTIME)

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index fc6f18d..9da2408 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -38,7 +38,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.lang.reflect.TypeVariable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.LinkedList;
@@ -64,6 +63,8 @@ import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.beam.sdk.values.TypeDescriptors;
+import org.apache.beam.sdk.values.TypeDescriptors.TypeVariableExtractor;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.ReadableDuration;
@@ -554,14 +555,13 @@ public class Watch {
       if (outputCoder == null) {
         // If a coder was not specified explicitly, infer it from the OutputT type parameter
         // of the PollFn.
-        TypeDescriptor<?> superDescriptor =
-            TypeDescriptor.of(getPollFn().getClass()).getSupertype(PollFn.class);
-        TypeVariable typeVariable = superDescriptor.getTypeParameter("OutputT");
-        @SuppressWarnings("unchecked")
-        TypeDescriptor<OutputT> descriptor =
-            (TypeDescriptor<OutputT>) superDescriptor.resolveType(typeVariable);
+        TypeDescriptor<OutputT> outputT =
+            TypeDescriptors.extractFromTypeParameters(
+                getPollFn(),
+                PollFn.class,
+                new TypeVariableExtractor<PollFn<InputT, OutputT>, OutputT>() {});
         try {
-          outputCoder = input.getPipeline().getCoderRegistry().getCoder(descriptor);
+          outputCoder = input.getPipeline().getCoderRegistry().getCoder(outputT);
         } catch (CannotProvideCoderException e) {
           throw new RuntimeException(
               "Unable to infer coder for OutputT. Specify it explicitly using withOutputCoder().");

http://git-wip-us.apache.org/repos/asf/beam/blob/fe002c22/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
index 8ad6030..aa6090d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOReadTest.java
@@ -25,6 +25,7 @@ import static org.apache.beam.sdk.io.TextIO.CompressionType.DEFLATE;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.GZIP;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.UNCOMPRESSED;
 import static org.apache.beam.sdk.io.TextIO.CompressionType.ZIP;
+import static org.apache.beam.sdk.transforms.Watch.Growth.afterTimeSinceNewOutput;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -63,6 +64,7 @@ import java.util.zip.ZipOutputStream;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
 import org.apache.beam.sdk.io.TextIO.CompressionType;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -70,6 +72,7 @@ import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.UsesSplittableParDo;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
@@ -78,6 +81,7 @@ import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream;
 import org.apache.commons.compress.compressors.deflate.DeflateCompressorOutputStream;
+import org.joda.time.Duration;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -787,7 +791,8 @@ public class TextIOReadTest {
   private TextSource prepareSource(byte[] data) throws IOException {
     Path path = Files.createTempFile(tempFolder, "tempfile", "ext");
     Files.write(path, data);
-    return new TextSource(ValueProvider.StaticValueProvider.of(path.toString()));
+    return new TextSource(
+        ValueProvider.StaticValueProvider.of(path.toString()), EmptyMatchTreatment.DISALLOW);
   }
 
   @Test
@@ -872,4 +877,51 @@ public class TextIOReadTest {
     PAssert.that(lines).containsInAnyOrder(Iterables.concat(TINY, TINY, LARGE, LARGE));
     p.run();
   }
+
+  @Test
+  @Category({NeedsRunner.class, UsesSplittableParDo.class})
+  public void testReadWatchForNewFiles() throws IOException, InterruptedException {
+    final Path basePath = tempFolder.resolve("readWatch");
+    basePath.toFile().mkdir();
+    PCollection<String> lines =
+        p.apply(
+            TextIO.read()
+                .from(basePath.resolve("*").toString())
+                // Make sure that compression type propagates into readAll()
+                .withCompressionType(ZIP)
+                .watchForNewFiles(
+                    Duration.millis(100), afterTimeSinceNewOutput(Duration.standardSeconds(3))));
+
+    Thread writer =
+        new Thread() {
+          @Override
+          public void run() {
+            try {
+              Thread.sleep(1000);
+              writeToFile(
+                  Arrays.asList("a.1", "a.2"),
+                  basePath.resolve("fileA").toString(),
+                  CompressionType.ZIP);
+              Thread.sleep(300);
+              writeToFile(
+                  Arrays.asList("b.1", "b.2"),
+                  basePath.resolve("fileB").toString(),
+                  CompressionType.ZIP);
+              Thread.sleep(300);
+              writeToFile(
+                  Arrays.asList("c.1", "c.2"),
+                  basePath.resolve("fileC").toString(),
+                  CompressionType.ZIP);
+            } catch (IOException | InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        };
+    writer.start();
+
+    PAssert.that(lines).containsInAnyOrder("a.1", "a.2", "b.1", "b.2", "c.1", "c.2");
+    p.run();
+
+    writer.join();
+  }
 }


[3/4] beam git commit: Adds coders for boolean, ResourceId and Metadata

Posted by jk...@apache.org.
Adds coders for boolean, ResourceId and Metadata


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e43b238
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e43b238
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e43b238

Branch: refs/heads/master
Commit: 5e43b2388652f38a37ab3378a63ae88e6ad53ee3
Parents: 8d337ff
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 14:42:07 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 16:38:23 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/coders/BooleanCoder.java    | 59 ++++++++++++++++++
 .../apache/beam/sdk/coders/CoderRegistry.java   | 10 ++++
 .../apache/beam/sdk/io/fs/MetadataCoder.java    | 63 ++++++++++++++++++++
 .../apache/beam/sdk/io/fs/ResourceIdCoder.java  | 56 +++++++++++++++++
 .../org/apache/beam/sdk/transforms/Watch.java   |  7 ++-
 5 files changed, 192 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java
new file mode 100644
index 0000000..e7f7543
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/BooleanCoder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.coders;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/** A {@link Coder} for {@link Boolean}. */
+public class BooleanCoder extends AtomicCoder<Boolean> {
+  private static final ByteCoder BYTE_CODER = ByteCoder.of();
+
+  private static final BooleanCoder INSTANCE = new BooleanCoder();
+
+  /** Returns the singleton instance of {@link BooleanCoder}. */
+  public static BooleanCoder of() {
+    return INSTANCE;
+  }
+
+  @Override
+  public void encode(Boolean value, OutputStream os) throws IOException {
+    BYTE_CODER.encode(value ? (byte) 1 : 0, os);
+  }
+
+  @Override
+  public Boolean decode(InputStream is) throws IOException {
+    return BYTE_CODER.decode(is) == 1;
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return true;
+  }
+
+  @Override
+  public boolean isRegisterByteSizeObserverCheap(Boolean value) {
+    return true;
+  }
+
+  @Override
+  protected long getEncodedElementByteSize(Boolean value) throws Exception {
+    return 1;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
index 48389b1..c335bda 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/CoderRegistry.java
@@ -43,6 +43,10 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.coders.CannotProvideCoderException.ReasonCode;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+import org.apache.beam.sdk.io.fs.MetadataCoder;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.io.fs.ResourceIdCoder;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.util.CoderUtils;
@@ -89,6 +93,8 @@ public class CoderRegistry {
 
     private CommonTypes() {
       ImmutableMap.Builder<Class<?>, CoderProvider> builder = ImmutableMap.builder();
+      builder.put(Boolean.class,
+          CoderProviders.fromStaticMethods(Boolean.class, BooleanCoder.class));
       builder.put(Byte.class,
           CoderProviders.fromStaticMethods(Byte.class, ByteCoder.class));
       builder.put(BitSet.class,
@@ -109,6 +115,10 @@ public class CoderRegistry {
           CoderProviders.fromStaticMethods(Long.class, VarLongCoder.class));
       builder.put(Map.class,
           CoderProviders.fromStaticMethods(Map.class, MapCoder.class));
+      builder.put(Metadata.class,
+          CoderProviders.fromStaticMethods(Metadata.class, MetadataCoder.class));
+      builder.put(ResourceId.class,
+          CoderProviders.fromStaticMethods(ResourceId.class, ResourceIdCoder.class));
       builder.put(Set.class,
           CoderProviders.fromStaticMethods(Set.class, SetCoder.class));
       builder.put(String.class,

http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
new file mode 100644
index 0000000..5c9c4d7
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MetadataCoder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
+
+/** A {@link Coder} for {@link Metadata}. */
+public class MetadataCoder extends AtomicCoder<Metadata> {
+  private static final ResourceIdCoder RESOURCE_ID_CODER = ResourceIdCoder.of();
+  private static final VarIntCoder INT_CODER = VarIntCoder.of();
+  private static final VarLongCoder LONG_CODER = VarLongCoder.of();
+
+  /** Creates a {@link MetadataCoder}. */
+  public static MetadataCoder of() {
+    return new MetadataCoder();
+  }
+
+  @Override
+  public void encode(Metadata value, OutputStream os) throws IOException {
+    RESOURCE_ID_CODER.encode(value.resourceId(), os);
+    INT_CODER.encode(value.isReadSeekEfficient() ? 1 : 0, os);
+    LONG_CODER.encode(value.sizeBytes(), os);
+  }
+
+  @Override
+  public Metadata decode(InputStream is) throws IOException {
+    ResourceId resourceId = RESOURCE_ID_CODER.decode(is);
+    boolean isReadSeekEfficient = INT_CODER.decode(is) == 1;
+    long sizeBytes = LONG_CODER.decode(is);
+    return Metadata.builder()
+        .setResourceId(resourceId)
+        .setIsReadSeekEfficient(isReadSeekEfficient)
+        .setSizeBytes(sizeBytes)
+        .build();
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java
new file mode 100644
index 0000000..d7649c0
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/ResourceIdCoder.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.FileSystems;
+
+/** A {@link Coder} for {@link ResourceId}. */
+public class ResourceIdCoder extends AtomicCoder<ResourceId> {
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+  private static final Coder<Boolean> BOOL_CODER = BooleanCoder.of();
+
+  /** Creates a {@link ResourceIdCoder}. */
+  public static ResourceIdCoder of() {
+    return new ResourceIdCoder();
+  }
+
+  @Override
+  public void encode(ResourceId value, OutputStream os) throws IOException {
+    STRING_CODER.encode(value.toString(), os);
+    BOOL_CODER.encode(value.isDirectory(), os);
+  }
+
+  @Override
+  public ResourceId decode(InputStream is) throws IOException {
+    String spec = STRING_CODER.decode(is);
+    boolean isDirectory = BOOL_CODER.decode(is);
+    return FileSystems.matchNewResource(spec, isDirectory);
+  }
+
+  @Override
+  public boolean consistentWithEquals() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/5e43b238/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index b21eb62..fc6f18d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -47,6 +47,7 @@ import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.BooleanCoder;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.DurationCoder;
@@ -958,7 +959,7 @@ public class Watch {
       return new GrowthStateCoder<>(outputCoder, terminationStateCoder);
     }
 
-    private static final Coder<Integer> INT_CODER = VarIntCoder.of();
+    private static final Coder<Boolean> BOOLEAN_CODER = BooleanCoder.of();
     private static final Coder<Instant> INSTANT_CODER = NullableCoder.of(InstantCoder.of());
     private static final Coder<HashCode> HASH_CODE_CODER = HashCode128Coder.of();
 
@@ -980,7 +981,7 @@ public class Watch {
         throws IOException {
       completedCoder.encode(value.completed, os);
       pendingCoder.encode(value.pending, os);
-      INT_CODER.encode(value.isOutputComplete ? 1 : 0, os);
+      BOOLEAN_CODER.encode(value.isOutputComplete, os);
       terminationStateCoder.encode(value.terminationState, os);
       INSTANT_CODER.encode(value.pollWatermark, os);
     }
@@ -989,7 +990,7 @@ public class Watch {
     public GrowthState<OutputT, TerminationStateT> decode(InputStream is) throws IOException {
       Map<HashCode, Instant> completed = completedCoder.decode(is);
       List<TimestampedValue<OutputT>> pending = pendingCoder.decode(is);
-      boolean isOutputComplete = (INT_CODER.decode(is) == 1);
+      boolean isOutputComplete = BOOLEAN_CODER.decode(is);
       TerminationStateT terminationState = terminationStateCoder.decode(is);
       Instant pollWatermark = INSTANT_CODER.decode(is);
       return new GrowthState<>(


[4/4] beam git commit: This closes #3607: [BEAM-2512] Introduces TextIO.watchForNewFiles() and the Match transform

Posted by jk...@apache.org.
This closes #3607: [BEAM-2512] Introduces TextIO.watchForNewFiles() and the Match transform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/df36bd9d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/df36bd9d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/df36bd9d

Branch: refs/heads/master
Commit: df36bd9d7979bd6956ec299a6f63d5a0237d031a
Parents: 8d337ff fe002c2
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Aug 4 16:38:39 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 16:38:39 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/annotations/Experimental.java      |   5 +-
 .../apache/beam/sdk/coders/BooleanCoder.java    |  59 +++++++
 .../apache/beam/sdk/coders/CoderRegistry.java   |  10 ++
 .../java/org/apache/beam/sdk/io/AvroIO.java     |   2 +
 .../org/apache/beam/sdk/io/FileBasedSource.java |  52 ++++---
 .../org/apache/beam/sdk/io/FileSystems.java     |  46 ++++++
 .../main/java/org/apache/beam/sdk/io/Match.java | 156 +++++++++++++++++++
 .../beam/sdk/io/ReadAllViaFileBasedSource.java  |  46 +++---
 .../java/org/apache/beam/sdk/io/TextIO.java     | 156 ++++++++++++++++---
 .../java/org/apache/beam/sdk/io/TextSource.java |   7 +-
 .../beam/sdk/io/fs/EmptyMatchTreatment.java     |  46 ++++++
 .../org/apache/beam/sdk/io/fs/MatchResult.java  |   5 +-
 .../apache/beam/sdk/io/fs/MetadataCoder.java    |  63 ++++++++
 .../apache/beam/sdk/io/fs/ResourceIdCoder.java  |  56 +++++++
 .../org/apache/beam/sdk/transforms/DoFn.java    |  11 +-
 .../org/apache/beam/sdk/transforms/Watch.java   |  23 +--
 .../apache/beam/sdk/io/FileBasedSourceTest.java |  51 ++++++
 .../org/apache/beam/sdk/io/TextIOReadTest.java  |  54 ++++++-
 18 files changed, 756 insertions(+), 92 deletions(-)
----------------------------------------------------------------------



[2/4] beam git commit: Introduces EmptyMatchTreatment parameter to FileSystems.match()

Posted by jk...@apache.org.
Introduces EmptyMatchTreatment parameter to FileSystems.match()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/db9aede2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/db9aede2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/db9aede2

Branch: refs/heads/master
Commit: db9aede289f8546bb30113353f07aa75daa83eba
Parents: 5e43b23
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Aug 3 14:43:48 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Fri Aug 4 16:38:23 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/FileBasedSource.java | 52 ++++++++++----------
 .../org/apache/beam/sdk/io/FileSystems.java     | 46 +++++++++++++++++
 .../java/org/apache/beam/sdk/io/TextSource.java |  7 ++-
 .../beam/sdk/io/fs/EmptyMatchTreatment.java     | 46 +++++++++++++++++
 .../org/apache/beam/sdk/io/fs/MatchResult.java  |  5 +-
 .../apache/beam/sdk/io/FileBasedSourceTest.java | 51 +++++++++++++++++++
 6 files changed, 180 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index d4413c9..7f865de 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -23,19 +23,17 @@ import static com.google.common.base.Preconditions.checkState;
 import static com.google.common.base.Verify.verify;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.SeekableByteChannel;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
-import org.apache.beam.sdk.io.fs.MatchResult.Status;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -68,6 +66,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   private static final Logger LOG = LoggerFactory.getLogger(FileBasedSource.class);
 
   private final ValueProvider<String> fileOrPatternSpec;
+  private final EmptyMatchTreatment emptyMatchTreatment;
   @Nullable private MatchResult.Metadata singleFileMetadata;
   private final Mode mode;
 
@@ -80,15 +79,28 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
   }
 
   /**
-   * Create a {@code FileBaseSource} based on a file or a file pattern specification.
+   * Create a {@code FileBaseSource} based on a file or a file pattern specification, with the given
+   * strategy for treating filepatterns that do not match any files.
    */
-  protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
+  protected FileBasedSource(
+      ValueProvider<String> fileOrPatternSpec,
+      EmptyMatchTreatment emptyMatchTreatment,
+      long minBundleSize) {
     super(0, Long.MAX_VALUE, minBundleSize);
-    mode = Mode.FILEPATTERN;
+    this.mode = Mode.FILEPATTERN;
+    this.emptyMatchTreatment = emptyMatchTreatment;
     this.fileOrPatternSpec = fileOrPatternSpec;
   }
 
   /**
+   * Like {@link #FileBasedSource(ValueProvider, EmptyMatchTreatment, long)}, but uses the default
+   * value of {@link EmptyMatchTreatment#DISALLOW}.
+   */
+  protected FileBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
+    this(fileOrPatternSpec, EmptyMatchTreatment.DISALLOW, minBundleSize);
+  }
+
+  /**
    * Create a {@code FileBasedSource} based on a single file. This constructor must be used when
    * creating a new {@code FileBasedSource} for a subrange of a single file.
    * Additionally, this constructor must be used to create new {@code FileBasedSource}s when
@@ -110,6 +122,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     mode = Mode.SINGLE_FILE_OR_SUBRANGE;
     this.singleFileMetadata = checkNotNull(fileMetadata, "fileMetadata");
     this.fileOrPatternSpec = StaticValueProvider.of(fileMetadata.resourceId().toString());
+
+    // This field will be unused in this mode.
+    this.emptyMatchTreatment = null;
   }
 
   /**
@@ -204,14 +219,7 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
 
     if (mode == Mode.FILEPATTERN) {
       long totalSize = 0;
-      List<MatchResult> inputs = FileSystems.match(Collections.singletonList(fileOrPattern));
-      MatchResult result = Iterables.getOnlyElement(inputs);
-      checkArgument(
-          result.status() == Status.OK,
-          "Error matching the pattern or glob %s: status %s",
-          fileOrPattern,
-          result.status());
-      List<Metadata> allMatches = result.metadata();
+      List<Metadata> allMatches = FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
       for (Metadata metadata : allMatches) {
         totalSize += metadata.sizeBytes();
       }
@@ -254,9 +262,8 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
 
     if (mode == Mode.FILEPATTERN) {
       long startTime = System.currentTimeMillis();
-      List<Metadata> expandedFiles = FileBasedSource.expandFilePattern(fileOrPattern);
-      checkArgument(!expandedFiles.isEmpty(),
-          "Unable to find any files matching %s", fileOrPattern);
+      List<Metadata> expandedFiles =
+          FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
       List<FileBasedSource<T>> splitResults = new ArrayList<>(expandedFiles.size());
       for (Metadata metadata : expandedFiles) {
         FileBasedSource<T> split = createForSubrangeOfFile(metadata, 0, metadata.sizeBytes());
@@ -327,7 +334,9 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
 
     if (mode == Mode.FILEPATTERN) {
       long startTime = System.currentTimeMillis();
-      List<Metadata> fileMetadata = FileBasedSource.expandFilePattern(fileOrPattern);
+      List<Metadata> fileMetadata =
+          FileSystems.match(fileOrPattern, emptyMatchTreatment).metadata();
+      LOG.info("Matched {} files for pattern {}", fileMetadata.size(), fileOrPattern);
       List<FileBasedReader<T>> fileReaders = new ArrayList<>();
       for (Metadata metadata : fileMetadata) {
         long endOffset = metadata.sizeBytes();
@@ -389,13 +398,6 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     return metadata.sizeBytes();
   }
 
-  private static List<Metadata> expandFilePattern(String fileOrPatternSpec) throws IOException {
-    MatchResult matches =
-        Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(fileOrPatternSpec)));
-    LOG.info("Matched {} files for pattern {}", matches.metadata().size(), fileOrPatternSpec);
-    return ImmutableList.copyOf(matches.metadata());
-  }
-
   /**
    * A {@link Source.Reader reader} that implements code common to readers of
    * {@code FileBasedSource}s.

http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
index bd4668f..96394b8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
@@ -54,6 +54,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.io.fs.CreateOptions;
 import org.apache.beam.sdk.io.fs.CreateOptions.StandardCreateOptions;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.MatchResult.Status;
@@ -72,6 +73,8 @@ public class FileSystems {
   public static final String DEFAULT_SCHEME = "file";
   private static final Pattern FILE_SCHEME_PATTERN =
       Pattern.compile("(?<scheme>[a-zA-Z][-a-zA-Z0-9+.]*):.*");
+  private static final Pattern GLOB_PATTERN =
+      Pattern.compile("[*?{}]");
 
   private static final AtomicReference<Map<String, FileSystem>> SCHEME_TO_FILESYSTEM =
       new AtomicReference<Map<String, FileSystem>>(
@@ -79,6 +82,11 @@ public class FileSystems {
 
   /********************************** METHODS FOR CLIENT **********************************/
 
+  /** Checks whether the given spec contains a glob wildcard character. */
+  public static boolean hasGlobWildcard(String spec) {
+    return GLOB_PATTERN.matcher(spec).find();
+  }
+
   /**
    * This is the entry point to convert user-provided specs to {@link ResourceId ResourceIds}.
    * Callers should use {@link #match} to resolve users specs ambiguities before
@@ -102,6 +110,9 @@ public class FileSystems {
    * <p>In case the spec schemes don't match any known {@link FileSystem} implementations,
    * FileSystems will attempt to use {@link LocalFileSystem} to resolve a path.
    *
+   * <p>Specs that do not match any resources are treated according to
+   * {@link EmptyMatchTreatment#DISALLOW}.
+   *
    * @return {@code List<MatchResult>} in the same order of the input specs.
    *
    * @throws IllegalArgumentException if specs are invalid -- empty or have different schemes.
@@ -114,6 +125,17 @@ public class FileSystems {
     return getFileSystemInternal(getOnlyScheme(specs)).match(specs);
   }
 
+  /** Like {@link #match(List)}, but with a configurable {@link EmptyMatchTreatment}. */
+  public static List<MatchResult> match(List<String> specs, EmptyMatchTreatment emptyMatchTreatment)
+      throws IOException {
+    List<MatchResult> matches = getFileSystemInternal(getOnlyScheme(specs)).match(specs);
+    List<MatchResult> res = Lists.newArrayListWithExpectedSize(matches.size());
+    for (int i = 0; i < matches.size(); i++) {
+      res.add(maybeAdjustEmptyMatchResult(specs.get(i), matches.get(i), emptyMatchTreatment));
+    }
+    return res;
+  }
+
 
   /**
    * Like {@link #match(List)}, but for a single resource specification.
@@ -130,6 +152,30 @@ public class FileSystems {
         matches);
     return matches.get(0);
   }
+
+  /** Like {@link #match(String)}, but with a configurable {@link EmptyMatchTreatment}. */
+  public static MatchResult match(String spec, EmptyMatchTreatment emptyMatchTreatment)
+      throws IOException {
+    MatchResult res = match(spec);
+    return maybeAdjustEmptyMatchResult(spec, res, emptyMatchTreatment);
+  }
+
+  private static MatchResult maybeAdjustEmptyMatchResult(
+      String spec, MatchResult res, EmptyMatchTreatment emptyMatchTreatment)
+      throws IOException {
+    if (res.status() != Status.NOT_FOUND) {
+      return res;
+    }
+    boolean notFoundAllowed =
+        emptyMatchTreatment == EmptyMatchTreatment.ALLOW
+            || (FileSystems.hasGlobWildcard(spec)
+                && emptyMatchTreatment == EmptyMatchTreatment.ALLOW_IF_WILDCARD);
+    if (notFoundAllowed) {
+      return MatchResult.create(Status.OK, Collections.<Metadata>emptyList());
+    }
+    return res;
+  }
+
   /**
    * Returns the {@link Metadata} for a single file resource. Expects a resource specification
    * {@code spec} that matches a single result.

http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
index 86c73d5..29188dc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSource.java
@@ -28,6 +28,7 @@ import java.nio.channels.SeekableByteChannel;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -48,7 +49,11 @@ import org.apache.beam.sdk.options.ValueProvider;
 @VisibleForTesting
 class TextSource extends FileBasedSource<String> {
   TextSource(ValueProvider<String> fileSpec) {
-    super(fileSpec, 1L);
+    this(fileSpec, EmptyMatchTreatment.DISALLOW);
+  }
+
+  TextSource(ValueProvider<String> fileSpec, EmptyMatchTreatment emptyMatchTreatment) {
+    super(fileSpec, emptyMatchTreatment, 1L);
   }
 
   private TextSource(MatchResult.Metadata metadata, long start, long end) {

http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java
new file mode 100644
index 0000000..8e12993
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/EmptyMatchTreatment.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.fs;
+
+import org.apache.beam.sdk.io.fs.MatchResult.Status;
+
+/**
+ * Options for allowing or disallowing filepatterns that match no resources in {@link
+ * org.apache.beam.sdk.io.FileSystems#match}.
+ */
+public enum EmptyMatchTreatment {
+  /**
+   * Filepatterns matching no resources are allowed. For such a filepattern, {@link
+   * MatchResult#status} will be {@link Status#OK} and {@link MatchResult#metadata} will return an
+   * empty list.
+   */
+  ALLOW,
+
+  /**
+   * Filepatterns matching no resources are disallowed. For such a filepattern, {@link
+   * MatchResult#status} will be {@link Status#NOT_FOUND} and {@link MatchResult#metadata} will
+   * throw a {@link java.io.FileNotFoundException}.
+   */
+  DISALLOW,
+
+  /**
+   * Filepatterns matching no resources are allowed if the filepattern contains a glob wildcard
+   * character, and disallowed otherwise (i.e. if the filepattern specifies a single file).
+   */
+  ALLOW_IF_WILDCARD
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
index 642c049..aa80b96 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/fs/MatchResult.java
@@ -21,6 +21,7 @@ import com.google.auto.value.AutoValue;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+import org.apache.beam.sdk.io.FileSystems;
 
 /**
  * The result of {@link org.apache.beam.sdk.io.FileSystem#match}.
@@ -78,7 +79,9 @@ public abstract class MatchResult {
   public abstract Status status();
 
   /**
-   * {@link Metadata} of matched files.
+   * {@link Metadata} of matched files. Note that if {@link #status()} is {@link Status#NOT_FOUND},
+   * this may either throw a {@link java.io.FileNotFoundException} or return an empty list,
+   * depending on the {@link EmptyMatchTreatment} used in the {@link FileSystems#match} call.
    */
   public abstract List<Metadata> metadata() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/db9aede2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index 1bdb915..ea9e06b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -47,6 +47,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSource.FileBasedReader;
 import org.apache.beam.sdk.io.Source.Reader;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -94,6 +95,15 @@ public class FileBasedSourceTest {
     }
 
     public TestFileBasedSource(
+        String fileOrPattern,
+        EmptyMatchTreatment emptyMatchTreatment,
+        long minBundleSize,
+        String splitHeader) {
+      super(StaticValueProvider.of(fileOrPattern), emptyMatchTreatment, minBundleSize);
+      this.splitHeader = splitHeader;
+    }
+
+    public TestFileBasedSource(
         Metadata fileOrPattern,
         long minBundleSize,
         long startOffset,
@@ -371,6 +381,47 @@ public class FileBasedSourceTest {
   }
 
   @Test
+  public void testEmptyFilepatternTreatmentDefaultDisallow() throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    TestFileBasedSource source =
+        new TestFileBasedSource(new File(tempFolder.getRoot(), "doesNotExist").getPath(), 64, null);
+    thrown.expect(FileNotFoundException.class);
+    readFromSource(source, options);
+  }
+
+  @Test
+  public void testEmptyFilepatternTreatmentAllow() throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    TestFileBasedSource source =
+        new TestFileBasedSource(
+            new File(tempFolder.getRoot(), "doesNotExist").getPath(),
+            EmptyMatchTreatment.ALLOW,
+            64,
+            null);
+    TestFileBasedSource sourceWithWildcard =
+        new TestFileBasedSource(
+            new File(tempFolder.getRoot(), "doesNotExist*").getPath(),
+            EmptyMatchTreatment.ALLOW_IF_WILDCARD,
+            64,
+            null);
+    assertEquals(0, readFromSource(source, options).size());
+    assertEquals(0, readFromSource(sourceWithWildcard, options).size());
+  }
+
+  @Test
+  public void testEmptyFilepatternTreatmentAllowIfWildcard() throws IOException {
+    PipelineOptions options = PipelineOptionsFactory.create();
+    TestFileBasedSource source =
+        new TestFileBasedSource(
+            new File(tempFolder.getRoot(), "doesNotExist").getPath(),
+            EmptyMatchTreatment.ALLOW_IF_WILDCARD,
+            64,
+            null);
+    thrown.expect(FileNotFoundException.class);
+    readFromSource(source, options);
+  }
+
+  @Test
   public void testCloseUnstartedFilePatternReader() throws IOException {
     PipelineOptions options = PipelineOptionsFactory.create();
     List<String> data1 = createStringDataset(3, 50);