You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/30 19:11:36 UTC

[1/6] beam git commit: Adds AvroIO watchForNewFiles

Repository: beam
Updated Branches:
  refs/heads/master d64f2cce8 -> 5c2cab017


Adds AvroIO watchForNewFiles


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f1f39871
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f1f39871
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f1f39871

Branch: refs/heads/master
Commit: f1f39871da3668bb2ffbc1c27449d36c995b645b
Parents: 82b0852
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 14:41:32 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 132 ++++++++++++++++++-
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  94 +++++++++++--
 2 files changed, 212 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/f1f39871/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 9601a7d..f6f3308 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.beam.sdk.transforms.Watch.Growth.ignoreInput;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -48,12 +49,14 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PDone;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.sdk.values.TypeDescriptors;
+import org.joda.time.Duration;
 
 /**
  * {@link PTransform}s for reading and writing Avro files.
@@ -76,6 +79,9 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  * allows them in case the filepattern contains a glob wildcard character. Use {@link
  * Read#withEmptyMatchTreatment} to configure this behavior.
  *
+ * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles}
+ * allows streaming of new files matching the filepattern(s).
+ *
  * <h3>Reading records of a known schema</h3>
  *
  * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
@@ -137,6 +143,20 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  *     filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...);
  * }</pre>
  *
+ * <h3>Streaming new files matching a filepattern</h3>
+ * <pre>{@code
+ * Pipeline p = ...;
+ *
+ * PCollection<AvroAutoGenClass> lines = p.apply(AvroIO
+ *     .read(AvroAutoGenClass.class)
+ *     .from("gs://my_bucket/path/to/records-*.avro")
+ *     .watchForNewFiles(
+ *       // Check for new files every minute
+ *       Duration.standardMinutes(1),
+ *       // Stop watching the filepattern if no new files appear within an hour
+ *       afterTimeSinceNewOutput(Duration.standardHours(1))));
+ * }</pre>
+ *
  * <h3>Reading a very large number of files</h3>
  *
  * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
@@ -406,6 +426,8 @@ public class AvroIO {
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
     @Nullable abstract ValueProvider<String> getFilepattern();
     abstract EmptyMatchTreatment getEmptyMatchTreatment();
+    @Nullable abstract Duration getWatchForNewFilesInterval();
+    @Nullable abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract boolean getHintMatchesManyFiles();
@@ -416,6 +438,9 @@ public class AvroIO {
     abstract static class Builder<T> {
       abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
       abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
+      abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+      abstract Builder<T> setWatchForNewFilesTerminationCondition(
+              TerminationCondition<?, ?> condition);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
       abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
@@ -446,6 +471,24 @@ public class AvroIO {
     }
 
     /**
+     * Continuously watches for new files matching the filepattern, polling it at the given
+     * interval, until the given termination condition is reached. The returned {@link PCollection}
+     * is unbounded.
+     *
+     * <p>This works only in runners supporting {@link Kind#SPLITTABLE_DO_FN}.
+     *
+     * @see TerminationCondition
+     */
+    @Experimental(Kind.SPLITTABLE_DO_FN)
+    public Read<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<?, ?> terminationCondition) {
+      return toBuilder()
+          .setWatchForNewFilesInterval(pollInterval)
+          .setWatchForNewFilesTerminationCondition(terminationCondition)
+          .build();
+    }
+
+    /**
      * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
      * files.
      *
@@ -463,7 +506,7 @@ public class AvroIO {
       checkNotNull(getFilepattern(), "filepattern");
       checkNotNull(getSchema(), "schema");
 
-      if (!getHintMatchesManyFiles()) {
+      if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) {
         return input.apply(
             "Read",
             org.apache.beam.sdk.io.Read.from(
@@ -477,6 +520,11 @@ public class AvroIO {
               ? (ReadAll<T>) readAllGenericRecords(getSchema())
               : readAll(getRecordClass());
       readAll = readAll.withEmptyMatchTreatment(getEmptyMatchTreatment());
+      if (getWatchForNewFilesInterval() != null) {
+        TerminationCondition<String, ?> readAllCondition =
+            ignoreInput(getWatchForNewFilesTerminationCondition());
+        readAll = readAll.watchForNewFiles(getWatchForNewFilesInterval(), readAllCondition);
+      }
       return input
           .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
           .apply("Via ReadAll", readAll);
@@ -490,7 +538,10 @@ public class AvroIO {
               DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
           .add(
               DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
-                  .withLabel("Treatment of filepatterns that match no files"));
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+                  .withLabel("Interval to watch for new files"));
     }
 
     @SuppressWarnings("unchecked")
@@ -513,6 +564,8 @@ public class AvroIO {
   @AutoValue
   public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
     abstract EmptyMatchTreatment getEmptyMatchTreatment();
+    @Nullable abstract Duration getWatchForNewFilesInterval();
+    @Nullable abstract TerminationCondition<String, ?> getWatchForNewFilesTerminationCondition();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract long getDesiredBundleSizeBytes();
@@ -522,6 +575,9 @@ public class AvroIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
+      abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+      abstract Builder<T> setWatchForNewFilesTerminationCondition(
+              TerminationCondition<String, ?> condition);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
       abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -534,6 +590,16 @@ public class AvroIO {
       return toBuilder().setEmptyMatchTreatment(treatment).build();
     }
 
+    /** Like {@link Read#watchForNewFiles}. */
+    @Experimental(Kind.SPLITTABLE_DO_FN)
+    public ReadAll<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return toBuilder()
+              .setWatchForNewFilesInterval(pollInterval)
+              .setWatchForNewFilesTerminationCondition(terminationCondition)
+              .build();
+    }
+
     @VisibleForTesting
     ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
       return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
@@ -544,6 +610,11 @@ public class AvroIO {
       checkNotNull(getSchema(), "schema");
       Match.Filepatterns matchFilepatterns =
           Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment());
+      if (getWatchForNewFilesInterval() != null) {
+        matchFilepatterns =
+            matchFilepatterns.continuously(
+                getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+      }
 
       return input
           .apply(matchFilepatterns)
@@ -563,7 +634,10 @@ public class AvroIO {
       builder
           .add(
               DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
-                  .withLabel("Treatment of filepatterns that match no files"));
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+                  .withLabel("Interval to watch for new files"));
     }
   }
 
@@ -594,6 +668,8 @@ public class AvroIO {
   public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
     @Nullable abstract ValueProvider<String> getFilepattern();
     abstract EmptyMatchTreatment getEmptyMatchTreatment();
+    @Nullable abstract Duration getWatchForNewFilesInterval();
+    @Nullable abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition();
     abstract SerializableFunction<GenericRecord, T> getParseFn();
     @Nullable abstract Coder<T> getCoder();
     abstract boolean getHintMatchesManyFiles();
@@ -604,6 +680,9 @@ public class AvroIO {
     abstract static class Builder<T> {
       abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
       abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
+      abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+      abstract Builder<T> setWatchForNewFilesTerminationCondition(
+              TerminationCondition<?, ?> condition);
       abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
       abstract Builder<T> setCoder(Coder<T> coder);
       abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles);
@@ -626,6 +705,16 @@ public class AvroIO {
       return toBuilder().setEmptyMatchTreatment(treatment).build();
     }
 
+    /** Like {@link Read#watchForNewFiles}. */
+    @Experimental(Kind.SPLITTABLE_DO_FN)
+    public Parse<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<?, ?> terminationCondition) {
+      return toBuilder()
+          .setWatchForNewFilesInterval(pollInterval)
+          .setWatchForNewFilesTerminationCondition(terminationCondition)
+          .build();
+    }
+
     /** Sets a coder for the result of the parse function. */
     public Parse<T> withCoder(Coder<T> coder) {
       return toBuilder().setCoder(coder).build();
@@ -641,7 +730,7 @@ public class AvroIO {
       checkNotNull(getFilepattern(), "filepattern");
       Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
 
-      if (!getHintMatchesManyFiles()) {
+      if (getWatchForNewFilesInterval() == null && !getHintMatchesManyFiles()) {
         return input.apply(
                 org.apache.beam.sdk.io.Read.from(
                         AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
@@ -651,6 +740,11 @@ public class AvroIO {
           parseAllGenericRecords(getParseFn())
               .withCoder(coder)
               .withEmptyMatchTreatment(getEmptyMatchTreatment());
+      if (getWatchForNewFilesInterval() != null) {
+        TerminationCondition<String, ?> parseAllCondition =
+            ignoreInput(getWatchForNewFilesTerminationCondition());
+        parseAll = parseAll.watchForNewFiles(getWatchForNewFilesInterval(), parseAllCondition);
+      }
       return input
           .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
           .apply("Via ParseAll", parseAll);
@@ -684,7 +778,10 @@ public class AvroIO {
           .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
           .add(
               DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
-                  .withLabel("Treatment of filepatterns that match no files"));
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+                  .withLabel("Interval to watch for new files"));
     }
   }
 
@@ -694,6 +791,8 @@ public class AvroIO {
   @AutoValue
   public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
     abstract EmptyMatchTreatment getEmptyMatchTreatment();
+    @Nullable abstract Duration getWatchForNewFilesInterval();
+    @Nullable abstract TerminationCondition<String, ?> getWatchForNewFilesTerminationCondition();
     abstract SerializableFunction<GenericRecord, T> getParseFn();
     @Nullable abstract Coder<T> getCoder();
     abstract long getDesiredBundleSizeBytes();
@@ -703,6 +802,9 @@ public class AvroIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
+      abstract Builder<T> setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
+      abstract Builder<T> setWatchForNewFilesTerminationCondition(
+          TerminationCondition<String, ?> condition);
       abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
       abstract Builder<T> setCoder(Coder<T> coder);
       abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -715,6 +817,16 @@ public class AvroIO {
       return toBuilder().setEmptyMatchTreatment(treatment).build();
     }
 
+    /** Like {@link Read#watchForNewFiles}. */
+    @Experimental(Kind.SPLITTABLE_DO_FN)
+    public ParseAll<T> watchForNewFiles(
+        Duration pollInterval, TerminationCondition<String, ?> terminationCondition) {
+      return toBuilder()
+          .setWatchForNewFilesInterval(pollInterval)
+          .setWatchForNewFilesTerminationCondition(terminationCondition)
+          .build();
+    }
+
     /** Specifies the coder for the result of the {@code parseFn}. */
     public ParseAll<T> withCoder(Coder<T> coder) {
       return toBuilder().setCoder(coder).build();
@@ -742,6 +854,11 @@ public class AvroIO {
           };
       Match.Filepatterns matchFilepatterns =
               Match.filepatterns().withEmptyMatchTreatment(emptyMatchTreatment);
+      if (getWatchForNewFilesInterval() != null) {
+        matchFilepatterns =
+                matchFilepatterns.continuously(
+                        getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+      }
       return input
           .apply(matchFilepatterns)
           .apply(
@@ -760,7 +877,10 @@ public class AvroIO {
           .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
           .add(
               DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
-                  .withLabel("Treatment of filepatterns that match no files"));
+                  .withLabel("Treatment of filepatterns that match no files"))
+          .addIfNotNull(
+              DisplayData.item("watchForNewFilesInterval", getWatchForNewFilesInterval())
+                  .withLabel("Interval to watch for new files"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/f1f39871/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index d0aa02c..f49443d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -57,6 +57,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.reflect.Nullable;
 import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -73,8 +74,11 @@ import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.Watch;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -260,19 +264,84 @@ public class AvroIOTest {
                 .withNumShards(3));
     writePipeline.run().waitUntilFinish();
 
-    // Test read(), readAll(), and parseAllGenericRecords().
+    // Test readAll() and parseAllGenericRecords().
+    PCollection<String> paths =
+        readPipeline.apply(
+            "Create paths",
+            Create.of(
+                tmpFolder.getRoot().getAbsolutePath() + "/first*",
+                tmpFolder.getRoot().getAbsolutePath() + "/second*"));
+    PAssert.that(
+            paths.apply(
+                "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
+    PAssert.that(
+            paths.apply(
+                "Parse all",
+                AvroIO.parseAllGenericRecords(new ParseGenericClass())
+                    .withCoder(AvroCoder.of(GenericClass.class))
+                    .withDesiredBundleSizeBytes(10)))
+        .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
+
+    readPipeline.run();
+  }
+
+  private static class CreateGenericClass extends SimpleFunction<Long, GenericClass> {
+    @Override
+    public GenericClass apply(Long i) {
+      return new GenericClass(i.intValue(), "value" + i);
+    }
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void testAvroIOContinuouslyWriteAndReadMultipleFilepatterns() throws Throwable {
+    SimpleFunction<Long, GenericClass> mapFn = new CreateGenericClass();
+    List<GenericClass> firstValues = Lists.newArrayList();
+    List<GenericClass> secondValues = Lists.newArrayList();
+    for (int i = 0; i < 7; ++i) {
+      (i < 3 ? firstValues : secondValues).add(mapFn.apply((long) i));
+    }
+    writePipeline.apply(
+            "Sequence first",
+            GenerateSequence.from(0).to(3).withRate(1, Duration.millis(300)))
+        .apply("Map first", MapElements.via(mapFn))
+        .apply(
+            "Write first",
+            AvroIO.write(GenericClass.class)
+                .to(tmpFolder.getRoot().getAbsolutePath() + "/first")
+                .withNumShards(2));
+    writePipeline.apply(
+            "Sequence second",
+            GenerateSequence.from(3).to(7).withRate(1, Duration.millis(300)))
+        .apply("Map second", MapElements.via(mapFn))
+        .apply(
+            "Write second",
+            AvroIO.write(GenericClass.class)
+                .to(tmpFolder.getRoot().getAbsolutePath() + "/second")
+                .withNumShards(3));
+    PipelineResult writeRes = writePipeline.run();
+
+    // Test read(), readAll(), parse(), and parseAllGenericRecords() with watchForNewFiles().
     PAssert.that(
             readPipeline.apply(
-                "Read first",
+                "Read",
                 AvroIO.read(GenericClass.class)
-                    .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")))
+                    .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
+                    .watchForNewFiles(
+                        Duration.millis(100),
+                        Watch.Growth.<Void>afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
         .containsInAnyOrder(firstValues);
     PAssert.that(
             readPipeline.apply(
-                "Read second",
-                AvroIO.read(GenericClass.class)
-                    .from(tmpFolder.getRoot().getAbsolutePath() + "/second*")))
-        .containsInAnyOrder(secondValues);
+                "Parse",
+                AvroIO.parseGenericRecords(new ParseGenericClass())
+                    .from(tmpFolder.getRoot().getAbsolutePath() + "/first*")
+                    .watchForNewFiles(
+                        Duration.millis(100),
+                        Watch.Growth.<Void>afterTimeSinceNewOutput(Duration.standardSeconds(3)))))
+        .containsInAnyOrder(firstValues);
+
     PCollection<String> paths =
         readPipeline.apply(
             "Create paths",
@@ -281,17 +350,26 @@ public class AvroIOTest {
                 tmpFolder.getRoot().getAbsolutePath() + "/second*"));
     PAssert.that(
             paths.apply(
-                "Read all", AvroIO.readAll(GenericClass.class).withDesiredBundleSizeBytes(10)))
+                "Read all",
+                AvroIO.readAll(GenericClass.class)
+                    .watchForNewFiles(
+                        Duration.millis(100),
+                        Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))
+                    .withDesiredBundleSizeBytes(10)))
         .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
     PAssert.that(
             paths.apply(
                 "Parse all",
                 AvroIO.parseAllGenericRecords(new ParseGenericClass())
                     .withCoder(AvroCoder.of(GenericClass.class))
+                    .watchForNewFiles(
+                        Duration.millis(100),
+                        Watch.Growth.<String>afterTimeSinceNewOutput(Duration.standardSeconds(3)))
                     .withDesiredBundleSizeBytes(10)))
         .containsInAnyOrder(Iterables.concat(firstValues, secondValues));
 
     readPipeline.run();
+    writeRes.waitUntilFinish();
   }
 
   @Test


[2/6] beam git commit: Gets rid of raw type in TextIO.Read.watchForNewFiles

Posted by jk...@apache.org.
Gets rid of raw type in TextIO.Read.watchForNewFiles


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/184f7a9b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/184f7a9b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/184f7a9b

Branch: refs/heads/master
Commit: 184f7a9b31641641cdb4bc7ddcf3556c0514f71b
Parents: d64f2cc
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 14:25:33 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 15 ++++---
 .../org/apache/beam/sdk/transforms/Watch.java   | 42 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/184f7a9b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index cbc17ff..835008f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.transforms.Watch.Growth.ignoreInput;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -250,7 +251,7 @@ public class TextIO {
     abstract Duration getWatchForNewFilesInterval();
 
     @Nullable
-    abstract TerminationCondition getWatchForNewFilesTerminationCondition();
+    abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition();
 
     abstract boolean getHintMatchesManyFiles();
     abstract EmptyMatchTreatment getEmptyMatchTreatment();
@@ -262,7 +263,8 @@ public class TextIO {
       abstract Builder setFilepattern(ValueProvider<String> filepattern);
       abstract Builder setCompressionType(CompressionType compressionType);
       abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
-      abstract Builder setWatchForNewFilesTerminationCondition(TerminationCondition condition);
+      abstract Builder setWatchForNewFilesTerminationCondition(
+              TerminationCondition<?, ?> condition);
       abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
       abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
 
@@ -312,7 +314,8 @@ public class TextIO {
      * @see TerminationCondition
      */
     @Experimental(Kind.SPLITTABLE_DO_FN)
-    public Read watchForNewFiles(Duration pollInterval, TerminationCondition terminationCondition) {
+    public Read watchForNewFiles(
+        Duration pollInterval, TerminationCondition<?, ?> terminationCondition) {
       return toBuilder()
           .setWatchForNewFilesInterval(pollInterval)
           .setWatchForNewFilesTerminationCondition(terminationCondition)
@@ -352,9 +355,9 @@ public class TextIO {
               .withCompressionType(getCompressionType())
               .withEmptyMatchTreatment(getEmptyMatchTreatment());
       if (getWatchForNewFilesInterval() != null) {
-        readAll =
-            readAll.watchForNewFiles(
-                getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+        TerminationCondition<String, ?> readAllCondition =
+            ignoreInput(getWatchForNewFilesTerminationCondition());
+        readAll = readAll.watchForNewFiles(getWatchForNewFilesInterval(), readAllCondition);
       }
       return input
           .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))

http://git-wip-us.apache.org/repos/asf/beam/blob/184f7a9b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index 9da2408..21f0641 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -264,6 +264,15 @@ public class Watch {
     }
 
     /**
+     * Wraps a given input-independent {@link TerminationCondition} as an equivalent condition
+     * with a given input type, passing {@code null} to the original condition as input.
+     */
+    public static <InputT, StateT> TerminationCondition<InputT, StateT> ignoreInput(
+        TerminationCondition<?, StateT> condition) {
+      return new IgnoreInput<>(condition);
+    }
+
+    /**
      * Returns a {@link TerminationCondition} that holds after the given time has elapsed after the
      * current input was seen.
      */
@@ -344,6 +353,39 @@ public class Watch {
       }
     }
 
+    static class IgnoreInput<InputT, StateT> implements TerminationCondition<InputT, StateT> {
+      private final TerminationCondition<?, StateT> wrapped;
+
+      IgnoreInput(TerminationCondition<?, StateT> wrapped) {
+        this.wrapped = wrapped;
+      }
+
+      @Override
+      public Coder<StateT> getStateCoder() {
+        return wrapped.getStateCoder();
+      }
+
+      @Override
+      public StateT forNewInput(Instant now, InputT input) {
+        return wrapped.forNewInput(now, null);
+      }
+
+      @Override
+      public StateT onSeenNewOutput(Instant now, StateT state) {
+        return wrapped.onSeenNewOutput(now, state);
+      }
+
+      @Override
+      public boolean canStopPolling(Instant now, StateT state) {
+        return wrapped.canStopPolling(now, state);
+      }
+
+      @Override
+      public String toString(StateT state) {
+        return wrapped.toString(state);
+      }
+    }
+
     static class AfterTotalOf<InputT>
         implements TerminationCondition<
             InputT, KV<Instant /* timeStarted */, ReadableDuration /* maxTimeSinceInput */>> {


[4/6] beam git commit: Better-organized javadocs for TextIO and AvroIO

Posted by jk...@apache.org.
Better-organized javadocs for TextIO and AvroIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/84eb7f3a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/84eb7f3a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/84eb7f3a

Branch: refs/heads/master
Commit: 84eb7f3ae431b467828a76e305123601d4ee333a
Parents: 184f7a9
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 14:29:52 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 83 +++++++++++++-------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 30 ++++---
 2 files changed, 75 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/84eb7f3a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 9e0422e..d4a7cbb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -57,13 +57,20 @@ import org.apache.beam.sdk.values.TypeDescriptors;
 /**
  * {@link PTransform}s for reading and writing Avro files.
  *
+ * <h2>Reading Avro files</h2>
+ *
  * <p>To read a {@link PCollection} from one or more Avro files with the same schema known at
- * pipeline construction time, use {@code AvroIO.read()}, using {@link AvroIO.Read#from} to specify
- * the filename or filepattern to read from. Alternatively, if the filepatterns to be read are
- * themselves in a {@link PCollection}, apply {@link #readAll}.
+ * pipeline construction time, use {@link #read}, using {@link AvroIO.Read#from} to specify the
+ * filename or filepattern to read from. If the filepatterns to be read are themselves in a {@link
+ * PCollection}, apply {@link #readAll}. If the schema is unknown at pipeline construction time, use
+ * {@link #parseGenericRecords} or {@link #parseAllGenericRecords}.
+ *
+ * <p>Many configuration options below apply to several or all of these transforms.
  *
  * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
  *
+ * <h3>Reading records of a known schema</h3>
+ *
  * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
  * {@link GenericRecord GenericRecords}, use {@link #readGenericRecords(Schema)} which takes a
  * {@link Schema} object, or {@link #readGenericRecords(String)} which takes an Avro schema in a
@@ -71,26 +78,34 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  * schema. Likewise, to read a {@link PCollection} of filepatterns, apply {@link
  * #readAllGenericRecords}.
  *
- * <p>To read records from files whose schema is unknown at pipeline construction time or differs
- * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
- * parsing function for converting each {@link GenericRecord} into a value of your custom type.
- * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link
- * #parseAllGenericRecords}.
- *
  * <p>For example:
  *
  * <pre>{@code
  * Pipeline p = ...;
  *
- * // A simple Read of a local file (only runs locally):
+ * // Read Avro-generated classes from files on GCS
  * PCollection<AvroAutoGenClass> records =
- *     p.apply(AvroIO.read(AvroAutoGenClass.class).from("/path/to/file.avro"));
+ *     p.apply(AvroIO.read(AvroAutoGenClass.class).from("gs://my_bucket/path/to/records-*.avro"));
  *
- * // A Read from a GCS file (runs locally and using remote execution):
+ * // Read GenericRecord's of the given schema from files on GCS
  * Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
  * PCollection<GenericRecord> records =
  *     p.apply(AvroIO.readGenericRecords(schema)
  *                .from("gs://my_bucket/path/to/records-*.avro"));
+ * }</pre>
+ *
+ * <h3>Reading records of an unknown schema</h3>
+ *
+ * <p>To read records from files whose schema is unknown at pipeline construction time or differs
+ * between files, use {@link #parseGenericRecords} - in this case, you will need to specify a
+ * parsing function for converting each {@link GenericRecord} into a value of your custom type.
+ * Likewise, to read a {@link PCollection} of filepatterns with unknown schema, use {@link
+ * #parseAllGenericRecords}.
+ *
+ * <p>For example:
+ *
+ * <pre>{@code
+ * Pipeline p = ...;
  *
  * PCollection<Foo> records =
  *     p.apply(AvroIO.parseGenericRecords(new SerializableFunction<GenericRecord, Foo>() {
@@ -101,12 +116,7 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  *     }));
  * }</pre>
  *
- * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
- * thousands or more), use {@link Read#withHintMatchesManyFiles} or {@link
- * Parse#withHintMatchesManyFiles} for better performance and scalability. Note that it may decrease
- * performance if the filepattern matches only a small number of files.
- *
- * <p>Reading from a {@link PCollection} of filepatterns:
+ * <h3>Reading from a {@link PCollection} of filepatterns</h3>
  *
  * <pre>{@code
  * Pipeline p = ...;
@@ -120,6 +130,15 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  *     filepatterns.apply(AvroIO.parseAllGenericRecords(new SerializableFunction...);
  * }</pre>
  *
+ * <h3>Reading a very large number of files</h3>
+ *
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small number
+ * of files.
+ *
+ * <h2>Writing Avro files</h2>
+ *
  * <p>To write a {@link PCollection} to one or more Avro files, use {@link AvroIO.Write}, using
  * {@code AvroIO.write().to(String)} to specify the output filename prefix. The default {@link
  * DefaultFilenamePolicy} will use this prefix, in conjunction with a {@link ShardNameTemplate} (set
@@ -128,13 +147,11 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  * default write filename policy using {@link Write#to(FileBasedSink.FilenamePolicy)} to specify a
  * custom file naming policy.
  *
- * <p>By default, all input is put into the global window before writing. If per-window writes are
- * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()}
- * will cause windowing and triggering to be preserved. When producing windowed writes with a
- * streaming runner that supports triggers, the number of output shards must be set explicitly using
- * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen
- * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set,
- * and unique windows and triggers must produce unique filenames.
+ * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link
+ * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or
+ * overridden using {@link AvroIO.Write#withCodec}.
+ *
+ * <h3>Writing specific or generic records</h3>
  *
  * <p>To write specific records, such as Avro-generated classes, use {@link #write(Class)}. To write
  * {@link GenericRecord GenericRecords}, use either {@link #writeGenericRecords(Schema)} which takes
@@ -157,6 +174,18 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  *     .withSuffix(".avro"));
  * }</pre>
  *
+ * <h3>Writing windowed or unbounded data</h3>
+ *
+ * <p>By default, all input is put into the global window before writing. If per-window writes are
+ * desired - for example, when using a streaming runner - {@link AvroIO.Write#withWindowedWrites()}
+ * will cause windowing and triggering to be preserved. When producing windowed writes with a
+ * streaming runner that supports triggers, the number of output shards must be set explicitly using
+ * {@link AvroIO.Write#withNumShards(int)}; some runners may set this for you to a runner-chosen
+ * value, so you may need not set it yourself. A {@link FileBasedSink.FilenamePolicy} must be set,
+ * and unique windows and triggers must produce unique filenames.
+ *
+ * <h3>Writing data to multiple destinations</h3>
+ *
  * <p>The following shows a more-complex example of AvroIO.Write usage, generating dynamic file
  * destinations as well as a dynamic Avro schema per file. In this example, a PCollection of user
  * events (e.g. actions on a website) is written out to Avro files. Each event contains the user id
@@ -201,10 +230,6 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  * events.apply("WriteAvros", AvroIO.<Integer>writeCustomTypeToGenericRecords()
  *     .to(new UserDynamicAvros()));
  * }</pre>
- *
- * <p>By default, {@link AvroIO.Write} produces output files that are compressed using the {@link
- * org.apache.avro.file.Codec CodecFactory.deflateCodec(6)}. This default can be changed or
- * overridden using {@link AvroIO.Write#withCodec}.
  */
 public class AvroIO {
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/84eb7f3a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 835008f..442e4d9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -56,6 +56,8 @@ import org.joda.time.Duration;
 /**
  * {@link PTransform}s for reading and writing text files.
  *
+ * <h2>Reading text files</h2>
+ *
  * <p>To read a {@link PCollection} from one or more text files, use {@code TextIO.read()} to
  * instantiate a transform and use {@link TextIO.Read#from(String)} to specify the path of the
  * file(s) to be read. Alternatively, if the filenames to be read are themselves in a {@link
@@ -64,6 +66,8 @@ import org.joda.time.Duration;
  * <p>{@link #read} returns a {@link PCollection} of {@link String Strings}, each corresponding to
  * one line of an input UTF-8 text file (split into lines delimited by '\n', '\r', or '\r\n').
  *
+ * <h3>Filepattern expansion and watching</h3>
+ *
  * <p>By default, the filepatterns are expanded only once. {@link Read#watchForNewFiles} and {@link
  * ReadAll#watchForNewFiles} allow streaming of new files matching the filepattern(s).
  *
@@ -81,11 +85,6 @@ import org.joda.time.Duration;
  * PCollection<String> lines = p.apply(TextIO.read().from("/local/path/to/file.txt"));
  * }</pre>
  *
- * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
- * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
- * scalability. Note that it may decrease performance if the filepattern matches only a small number
- * of files.
- *
  * <p>Example 2: reading a PCollection of filenames.
  *
  * <pre>{@code
@@ -113,6 +112,15 @@ import org.joda.time.Duration;
  *       afterTimeSinceNewOutput(Duration.standardHours(1))));
  * }</pre>
  *
+ * <h3>Reading a very large number of files</h3>
+ *
+ * <p>If it is known that the filepattern will match a very large number of files (e.g. tens of
+ * thousands or more), use {@link Read#withHintMatchesManyFiles} for better performance and
+ * scalability. Note that it may decrease performance if the filepattern matches only a small number
+ * of files.
+ *
+ * <h2>Writing text files</h2>
+ *
  * <p>To write a {@link PCollection} to one or more text files, use {@code TextIO.write()}, using
  * {@link TextIO.Write#to(String)} to specify the output prefix of the files to write.
  *
@@ -130,6 +138,13 @@ import org.joda.time.Duration;
  *      .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
  * }</pre>
  *
+ * <p>Any existing files with the same names as generated output files will be overwritten.
+ *
+ * <p>If you want better control over how filenames are generated than the default policy allows, a
+ * custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}.
+ *
+ * <h3>Writing windowed or unbounded data</h3>
+ *
  * <p>By default, all input is put into the global window before writing. If per-window writes are
  * desired - for example, when using a streaming runner - {@link TextIO.Write#withWindowedWrites()}
  * will cause windowing and triggering to be preserved. When producing windowed writes with a
@@ -140,8 +155,7 @@ import org.joda.time.Duration;
  * for the window and the pane; W is expanded into the window text, and P into the pane; the default
  * template will include both the window and the pane in the filename.
  *
- * <p>If you want better control over how filenames are generated than the default policy allows, a
- * custom {@link FilenamePolicy} can also be set using {@link TextIO.Write#to(FilenamePolicy)}.
+ * <h3>Writing data to multiple destinations</h3>
  *
  * <p>TextIO also supports dynamic, value-dependent file destinations. The most general form of this
  * is done via {@link TextIO.Write#to(DynamicDestinations)}. A {@link DynamicDestinations} class
@@ -166,8 +180,6 @@ import org.joda.time.Duration;
  *       }),
  *       new Params().withBaseFilename(baseDirectory + "/empty");
  * }</pre>
- *
- * <p>Any existing files with the same names as generated output files will be overwritten.
  */
 public class TextIO {
   /**


[3/6] beam git commit: Adds EmptyMatchTreatment to AvroIO

Posted by jk...@apache.org.
Adds EmptyMatchTreatment to AvroIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/82b08523
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/82b08523
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/82b08523

Branch: refs/heads/master
Commit: 82b08523084aa6f20ea3c4d5b8b89cdbe0378060
Parents: 84eb7f3
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 14:40:52 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 161 ++++++++++++++-----
 .../java/org/apache/beam/sdk/io/AvroSource.java |  22 ++-
 .../apache/beam/sdk/io/BlockBasedSource.java    |  27 +++-
 .../org/apache/beam/sdk/io/FileBasedSource.java |   4 +
 4 files changed, 171 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index d4a7cbb..9601a7d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -69,6 +70,12 @@ import org.apache.beam.sdk.values.TypeDescriptors;
  *
  * <p>See {@link FileSystems} for information on supported file systems and filepatterns.
  *
+ * <h3>Filepattern expansion and watching</h3>
+ *
+ * <p>By default, {@link #read} prohibits filepatterns that match no files, and {@link #readAll}
+ * allows them in case the filepattern contains a glob wildcard character. Use {@link
+ * Read#withEmptyMatchTreatment} to configure this behavior.
+ *
  * <h3>Reading records of a known schema</h3>
  *
  * <p>To read specific records, such as Avro-generated classes, use {@link #read(Class)}. To read
@@ -239,6 +246,7 @@ public class AvroIO {
    */
   public static <T> Read<T> read(Class<T> recordClass) {
     return new AutoValue_AvroIO_Read.Builder<T>()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
         .setRecordClass(recordClass)
         .setSchema(ReflectData.get().getSchema(recordClass))
         .setHintMatchesManyFiles(false)
@@ -248,6 +256,7 @@ public class AvroIO {
   /** Like {@link #read}, but reads each filepattern in the input {@link PCollection}. */
   public static <T> ReadAll<T> readAll(Class<T> recordClass) {
     return new AutoValue_AvroIO_ReadAll.Builder<T>()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
         .setRecordClass(recordClass)
         .setSchema(ReflectData.get().getSchema(recordClass))
         // 64MB is a reasonable value that allows to amortize the cost of opening files,
@@ -260,6 +269,7 @@ public class AvroIO {
   /** Reads Avro file(s) containing records of the specified schema. */
   public static Read<GenericRecord> readGenericRecords(Schema schema) {
     return new AutoValue_AvroIO_Read.Builder<GenericRecord>()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
         .setRecordClass(GenericRecord.class)
         .setSchema(schema)
         .setHintMatchesManyFiles(false)
@@ -272,6 +282,7 @@ public class AvroIO {
    */
   public static ReadAll<GenericRecord> readAllGenericRecords(Schema schema) {
     return new AutoValue_AvroIO_ReadAll.Builder<GenericRecord>()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
         .setRecordClass(GenericRecord.class)
         .setSchema(schema)
         .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
@@ -300,6 +311,7 @@ public class AvroIO {
    */
   public static <T> Parse<T> parseGenericRecords(SerializableFunction<GenericRecord, T> parseFn) {
     return new AutoValue_AvroIO_Parse.Builder<T>()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.DISALLOW)
         .setParseFn(parseFn)
         .setHintMatchesManyFiles(false)
         .build();
@@ -312,6 +324,7 @@ public class AvroIO {
   public static <T> ParseAll<T> parseAllGenericRecords(
       SerializableFunction<GenericRecord, T> parseFn) {
     return new AutoValue_AvroIO_ParseAll.Builder<T>()
+        .setEmptyMatchTreatment(EmptyMatchTreatment.ALLOW_IF_WILDCARD)
         .setParseFn(parseFn)
         .setDesiredBundleSizeBytes(64 * 1024 * 1024L)
         .build();
@@ -392,6 +405,7 @@ public class AvroIO {
   @AutoValue
   public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {
     @Nullable abstract ValueProvider<String> getFilepattern();
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract boolean getHintMatchesManyFiles();
@@ -401,6 +415,7 @@ public class AvroIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+      abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
       abstract Builder<T> setHintMatchesManyFiles(boolean hintManyFiles);
@@ -424,6 +439,13 @@ public class AvroIO {
     }
 
     /**
+     * Configures whether or not a filepattern matching no files is allowed.
+     */
+    public Read<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
+    /**
      * Hints that the filepattern specified in {@link #from(String)} matches a very large number of
      * files.
      *
@@ -440,37 +462,48 @@ public class AvroIO {
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getFilepattern(), "filepattern");
       checkNotNull(getSchema(), "schema");
-      if (getHintMatchesManyFiles()) {
-        ReadAll<T> readAll =
-            (getRecordClass() == GenericRecord.class)
-                ? (ReadAll<T>) readAllGenericRecords(getSchema())
-                : readAll(getRecordClass());
-        return input
-            .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-            .apply(readAll);
-      } else {
-        return input
-            .getPipeline()
-            .apply(
-                "Read",
-                org.apache.beam.sdk.io.Read.from(
-                    createSource(getFilepattern(), getRecordClass(), getSchema())));
+
+      if (!getHintMatchesManyFiles()) {
+        return input.apply(
+            "Read",
+            org.apache.beam.sdk.io.Read.from(
+                createSource(
+                    getFilepattern(), getEmptyMatchTreatment(), getRecordClass(), getSchema())));
       }
+      // All other cases go through ReadAll.
+
+      ReadAll<T> readAll =
+          (getRecordClass() == GenericRecord.class)
+              ? (ReadAll<T>) readAllGenericRecords(getSchema())
+              : readAll(getRecordClass());
+      readAll = readAll.withEmptyMatchTreatment(getEmptyMatchTreatment());
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+          .apply("Via ReadAll", readAll);
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder.addIfNotNull(
-          DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"));
+      builder
+          .addIfNotNull(
+              DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"));
     }
 
     @SuppressWarnings("unchecked")
     private static <T> AvroSource<T> createSource(
-        ValueProvider<String> filepattern, Class<T> recordClass, Schema schema) {
+        ValueProvider<String> filepattern,
+        EmptyMatchTreatment emptyMatchTreatment,
+        Class<T> recordClass,
+        Schema schema) {
+      AvroSource<?> source =
+          AvroSource.from(filepattern).withEmptyMatchTreatment(emptyMatchTreatment);
       return recordClass == GenericRecord.class
-          ? (AvroSource<T>) AvroSource.from(filepattern).withSchema(schema)
-          : AvroSource.from(filepattern).withSchema(recordClass);
+          ? (AvroSource<T>) source.withSchema(schema)
+          : source.withSchema(recordClass);
     }
   }
 
@@ -479,6 +512,7 @@ public class AvroIO {
   /** Implementation of {@link #readAll}. */
   @AutoValue
   public abstract static class ReadAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
     @Nullable abstract Class<T> getRecordClass();
     @Nullable abstract Schema getSchema();
     abstract long getDesiredBundleSizeBytes();
@@ -487,6 +521,7 @@ public class AvroIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
+      abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
       abstract Builder<T> setRecordClass(Class<T> recordClass);
       abstract Builder<T> setSchema(Schema schema);
       abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -494,6 +529,11 @@ public class AvroIO {
       abstract ReadAll<T> build();
     }
 
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public ReadAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
     @VisibleForTesting
     ReadAll<T> withDesiredBundleSizeBytes(long desiredBundleSizeBytes) {
       return toBuilder().setDesiredBundleSizeBytes(desiredBundleSizeBytes).build();
@@ -502,24 +542,40 @@ public class AvroIO {
     @Override
     public PCollection<T> expand(PCollection<String> input) {
       checkNotNull(getSchema(), "schema");
+      Match.Filepatterns matchFilepatterns =
+          Match.filepatterns().withEmptyMatchTreatment(getEmptyMatchTreatment());
+
       return input
-          .apply(Match.filepatterns())
+          .apply(matchFilepatterns)
           .apply(
               "Read all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
                   SerializableFunctions.<String, Boolean>constant(true) /* isSplittable */,
                   getDesiredBundleSizeBytes(),
-                  new CreateSourceFn<>(getRecordClass(), getSchema().toString())))
+                  new CreateSourceFn<>(
+                      getEmptyMatchTreatment(), getRecordClass(), getSchema().toString())))
           .setCoder(AvroCoder.of(getRecordClass(), getSchema()));
     }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"));
+    }
   }
 
   private static class CreateSourceFn<T>
       implements SerializableFunction<String, FileBasedSource<T>> {
+    private final EmptyMatchTreatment emptyMatchTreatment;
     private final Class<T> recordClass;
     private final Supplier<Schema> schemaSupplier;
 
-    public CreateSourceFn(Class<T> recordClass, String jsonSchema) {
+    public CreateSourceFn(
+        EmptyMatchTreatment emptyMatchTreatment, Class<T> recordClass, String jsonSchema) {
+      this.emptyMatchTreatment = emptyMatchTreatment;
       this.recordClass = recordClass;
       this.schemaSupplier = AvroUtils.serializableSchemaSupplier(jsonSchema);
     }
@@ -527,7 +583,7 @@ public class AvroIO {
     @Override
     public FileBasedSource<T> apply(String input) {
       return Read.createSource(
-          StaticValueProvider.of(input), recordClass, schemaSupplier.get());
+          StaticValueProvider.of(input), emptyMatchTreatment, recordClass, schemaSupplier.get());
     }
   }
 
@@ -537,6 +593,7 @@ public class AvroIO {
   @AutoValue
   public abstract static class Parse<T> extends PTransform<PBegin, PCollection<T>> {
     @Nullable abstract ValueProvider<String> getFilepattern();
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
     abstract SerializableFunction<GenericRecord, T> getParseFn();
     @Nullable abstract Coder<T> getCoder();
     abstract boolean getHintMatchesManyFiles();
@@ -546,6 +603,7 @@ public class AvroIO {
     @AutoValue.Builder
     abstract static class Builder<T> {
       abstract Builder<T> setFilepattern(ValueProvider<String> filepattern);
+      abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
       abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
       abstract Builder<T> setCoder(Coder<T> coder);
       abstract Builder<T> setHintMatchesManyFiles(boolean hintMatchesManyFiles);
@@ -563,6 +621,11 @@ public class AvroIO {
       return toBuilder().setFilepattern(filepattern).build();
     }
 
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public Parse<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
     /** Sets a coder for the result of the parse function. */
     public Parse<T> withCoder(Coder<T> coder) {
       return toBuilder().setCoder(coder).build();
@@ -577,14 +640,20 @@ public class AvroIO {
     public PCollection<T> expand(PBegin input) {
       checkNotNull(getFilepattern(), "filepattern");
       Coder<T> coder = inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
-      if (getHintMatchesManyFiles()) {
-        return input
-            .apply(Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
-            .apply(parseAllGenericRecords(getParseFn()).withCoder(getCoder()));
+
+      if (!getHintMatchesManyFiles()) {
+        return input.apply(
+                org.apache.beam.sdk.io.Read.from(
+                        AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
       }
-      return input.apply(
-          org.apache.beam.sdk.io.Read.from(
-              AvroSource.from(getFilepattern()).withParseFn(getParseFn(), coder)));
+      // All other cases go through ParseAllGenericRecords.
+      ParseAll<T> parseAll =
+          parseAllGenericRecords(getParseFn())
+              .withCoder(coder)
+              .withEmptyMatchTreatment(getEmptyMatchTreatment());
+      return input
+          .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
+          .apply("Via ParseAll", parseAll);
     }
 
     private static <T> Coder<T> inferCoder(
@@ -612,7 +681,10 @@ public class AvroIO {
       builder
           .addIfNotNull(
               DisplayData.item("filePattern", getFilepattern()).withLabel("Input File Pattern"))
-          .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+          .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"));
     }
   }
 
@@ -621,6 +693,7 @@ public class AvroIO {
   /** Implementation of {@link #parseAllGenericRecords}. */
   @AutoValue
   public abstract static class ParseAll<T> extends PTransform<PCollection<String>, PCollection<T>> {
+    abstract EmptyMatchTreatment getEmptyMatchTreatment();
     abstract SerializableFunction<GenericRecord, T> getParseFn();
     @Nullable abstract Coder<T> getCoder();
     abstract long getDesiredBundleSizeBytes();
@@ -629,6 +702,7 @@ public class AvroIO {
 
     @AutoValue.Builder
     abstract static class Builder<T> {
+      abstract Builder<T> setEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment);
       abstract Builder<T> setParseFn(SerializableFunction<GenericRecord, T> parseFn);
       abstract Builder<T> setCoder(Coder<T> coder);
       abstract Builder<T> setDesiredBundleSizeBytes(long desiredBundleSizeBytes);
@@ -636,6 +710,11 @@ public class AvroIO {
       abstract ParseAll<T> build();
     }
 
+    /** Like {@link Read#withEmptyMatchTreatment}. */
+    public ParseAll<T> withEmptyMatchTreatment(EmptyMatchTreatment treatment) {
+      return toBuilder().setEmptyMatchTreatment(treatment).build();
+    }
+
     /** Specifies the coder for the result of the {@code parseFn}. */
     public ParseAll<T> withCoder(Coder<T> coder) {
       return toBuilder().setCoder(coder).build();
@@ -650,15 +729,21 @@ public class AvroIO {
     public PCollection<T> expand(PCollection<String> input) {
       final Coder<T> coder =
           Parse.inferCoder(getCoder(), getParseFn(), input.getPipeline().getCoderRegistry());
-      SerializableFunction<String, FileBasedSource<T>> createSource =
+      final SerializableFunction<GenericRecord, T> parseFn = getParseFn();
+      final EmptyMatchTreatment emptyMatchTreatment = getEmptyMatchTreatment();
+      final SerializableFunction<String, FileBasedSource<T>> createSource =
           new SerializableFunction<String, FileBasedSource<T>>() {
             @Override
             public FileBasedSource<T> apply(String input) {
-              return AvroSource.from(input).withParseFn(getParseFn(), coder);
+              return AvroSource.from(input)
+                  .withParseFn(parseFn, coder)
+                  .withEmptyMatchTreatment(emptyMatchTreatment);
             }
           };
+      Match.Filepatterns matchFilepatterns =
+              Match.filepatterns().withEmptyMatchTreatment(emptyMatchTreatment);
       return input
-          .apply(Match.filepatterns())
+          .apply(matchFilepatterns)
           .apply(
               "Parse all via FileBasedSource",
               new ReadAllViaFileBasedSource<>(
@@ -671,7 +756,11 @@ public class AvroIO {
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
-      builder.add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"));
+      builder
+          .add(DisplayData.item("parseFn", getParseFn().getClass()).withLabel("Parse function"))
+          .add(
+              DisplayData.item("emptyMatchTreatment", getEmptyMatchTreatment().toString())
+                  .withLabel("Treatment of filepatterns that match no files"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
index 8dd3125..2600d76 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSource.java
@@ -57,6 +57,7 @@ import org.apache.beam.sdk.PipelineRunner;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -211,6 +212,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
   public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {
     return new AvroSource<>(
         fileNameOrPattern,
+        EmptyMatchTreatment.DISALLOW,
         DEFAULT_MIN_BUNDLE_SIZE,
         readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));
   }
@@ -220,11 +222,20 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));
   }
 
+  public AvroSource<T> withEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment) {
+    return new AvroSource<T>(
+            getFileOrPatternSpecProvider(),
+            emptyMatchTreatment,
+            getMinBundleSize(),
+            mode);
+  }
+
   /** Reads files containing records that conform to the given schema. */
   public AvroSource<GenericRecord> withSchema(String schema) {
     checkNotNull(schema, "schema");
     return new AvroSource<>(
         getFileOrPatternSpecProvider(),
+        getEmptyMatchTreatment(),
         getMinBundleSize(),
         readGenericRecordsWithSchema(schema));
   }
@@ -240,6 +251,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     checkNotNull(clazz, "clazz");
     return new AvroSource<>(
         getFileOrPatternSpecProvider(),
+        getEmptyMatchTreatment(),
         getMinBundleSize(),
         readGeneratedClasses(clazz));
   }
@@ -254,6 +266,7 @@ public class AvroSource<T> extends BlockBasedSource<T> {
     checkNotNull(parseFn, "coder");
     return new AvroSource<>(
         getFileOrPatternSpecProvider(),
+        getEmptyMatchTreatment(),
         getMinBundleSize(),
         parseGenericRecords(parseFn, coder));
   }
@@ -263,15 +276,17 @@ public class AvroSource<T> extends BlockBasedSource<T> {
    * minBundleSize} and its use.
    */
   public AvroSource<T> withMinBundleSize(long minBundleSize) {
-    return new AvroSource<>(getFileOrPatternSpecProvider(), minBundleSize, mode);
+    return new AvroSource<>(
+        getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), minBundleSize, mode);
   }
 
   /** Constructor for FILEPATTERN mode. */
   private AvroSource(
       ValueProvider<String> fileNameOrPattern,
+      EmptyMatchTreatment emptyMatchTreatment,
       long minBundleSize,
       Mode<T> mode) {
-    super(fileNameOrPattern, minBundleSize);
+    super(fileNameOrPattern, emptyMatchTreatment, minBundleSize);
     this.mode = mode;
   }
 
@@ -463,7 +478,8 @@ public class AvroSource<T> extends BlockBasedSource<T> {
         return new AvroSource<>(
             getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), mode);
       case FILEPATTERN:
-        return new AvroSource<>(getFileOrPatternSpecProvider(), getMinBundleSize(), mode);
+        return new AvroSource<>(
+            getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), getMinBundleSize(), mode);
         default:
           throw new InvalidObjectException(
               String.format("Unknown mode %s for AvroSource %s", getMode(), this));

http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
index 25e8483..ec4f4ad 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BlockBasedSource.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.NoSuchElementException;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -63,18 +64,36 @@ import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 public abstract class BlockBasedSource<T> extends FileBasedSource<T> {
   /**
    * Creates a {@code BlockBasedSource} based on a file name or pattern. Subclasses must call this
-   * constructor when creating a {@code BlockBasedSource} for a file pattern. See
-   * {@link FileBasedSource} for more information.
+   * constructor when creating a {@code BlockBasedSource} for a file pattern. See {@link
+   * FileBasedSource} for more information.
+   */
+  public BlockBasedSource(
+      String fileOrPatternSpec, EmptyMatchTreatment emptyMatchTreatment, long minBundleSize) {
+    this(StaticValueProvider.of(fileOrPatternSpec), emptyMatchTreatment, minBundleSize);
+  }
+
+  /**
+   * Like {@link #BlockBasedSource(String, EmptyMatchTreatment, long)} but with a default {@link
+   * EmptyMatchTreatment} of {@link EmptyMatchTreatment#DISALLOW}.
    */
   public BlockBasedSource(String fileOrPatternSpec, long minBundleSize) {
-    super(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
+    this(StaticValueProvider.of(fileOrPatternSpec), minBundleSize);
   }
 
   /** Like {@link #BlockBasedSource(String, long)}. */
   public BlockBasedSource(ValueProvider<String> fileOrPatternSpec, long minBundleSize) {
-    super(fileOrPatternSpec, minBundleSize);
+    this(fileOrPatternSpec, EmptyMatchTreatment.DISALLOW, minBundleSize);
   }
 
+  /** Like {@link #BlockBasedSource(String, EmptyMatchTreatment, long)}. */
+  public BlockBasedSource(
+          ValueProvider<String> fileOrPatternSpec,
+          EmptyMatchTreatment emptyMatchTreatment,
+          long minBundleSize) {
+    super(fileOrPatternSpec, emptyMatchTreatment, minBundleSize);
+  }
+
+
   /**
    * Creates a {@code BlockBasedSource} for a single file. Subclasses must call this constructor
    * when implementing {@link BlockBasedSource#createForSubrangeOfFile}. See documentation in

http://git-wip-us.apache.org/repos/asf/beam/blob/82b08523/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
index f835fa4..dabda84 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java
@@ -154,6 +154,10 @@ public abstract class FileBasedSource<T> extends OffsetBasedSource<T> {
     return fileOrPatternSpec;
   }
 
+  public final EmptyMatchTreatment getEmptyMatchTreatment() {
+    return emptyMatchTreatment;
+  }
+
   public final Mode getMode() {
     return mode;
   }


[5/6] beam git commit: Fixes a findbugs error in Apex runner

Posted by jk...@apache.org.
Fixes a findbugs error in Apex runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6590aed4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6590aed4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6590aed4

Branch: refs/heads/master
Commit: 6590aed4091a1fbff75311afc45c3d3df1b80d38
Parents: f1f3987
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 17:50:25 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:19 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/apex/translation/utils/ApexStateInternals.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6590aed4/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
index 18ea8e4..e23601d 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/utils/ApexStateInternals.java
@@ -431,7 +431,7 @@ public class ApexStateInternals<K> implements StateInternals {
     /**
      * Serializable state for internals (namespace to state tag to coded value).
      */
-    private Map<Slice, Table<String, String, byte[]>> perKeyState = new HashMap<>();
+    private Map<Slice, HashBasedTable<String, String, byte[]>> perKeyState = new HashMap<>();
     private final Coder<K> keyCoder;
 
     private ApexStateInternalsFactory(Coder<K> keyCoder) {
@@ -451,7 +451,7 @@ public class ApexStateInternals<K> implements StateInternals {
       } catch (CoderException e) {
         throw new RuntimeException(e);
       }
-      Table<String, String, byte[]> stateTable = perKeyState.get(keyBytes);
+      HashBasedTable<String, String, byte[]> stateTable = perKeyState.get(keyBytes);
       if (stateTable == null) {
         stateTable = HashBasedTable.create();
         perKeyState.put(keyBytes, stateTable);


[6/6] beam git commit: This closes #3725: [BEAM-2827] Introduces AvroIO.watchForNewFiles

Posted by jk...@apache.org.
This closes #3725: [BEAM-2827] Introduces AvroIO.watchForNewFiles


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5c2cab01
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5c2cab01
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5c2cab01

Branch: refs/heads/master
Commit: 5c2cab0179725ec433392b30efcfa2833b825a18
Parents: d64f2cc 6590aed
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 30 11:55:45 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:45 2017 -0700

----------------------------------------------------------------------
 .../translation/utils/ApexStateInternals.java   |   4 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     | 364 +++++++++++++++----
 .../java/org/apache/beam/sdk/io/AvroSource.java |  22 +-
 .../apache/beam/sdk/io/BlockBasedSource.java    |  27 +-
 .../org/apache/beam/sdk/io/FileBasedSource.java |   4 +
 .../java/org/apache/beam/sdk/io/TextIO.java     |  45 ++-
 .../org/apache/beam/sdk/transforms/Watch.java   |  42 +++
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |  94 ++++-
 8 files changed, 505 insertions(+), 97 deletions(-)
----------------------------------------------------------------------