You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/30 19:11:37 UTC

[2/6] beam git commit: Gets rid of raw type in TextIO.Read.watchForNewFiles

Gets rid of raw type in TextIO.Read.watchForNewFiles


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/184f7a9b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/184f7a9b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/184f7a9b

Branch: refs/heads/master
Commit: 184f7a9b31641641cdb4bc7ddcf3556c0514f71b
Parents: d64f2cc
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 14:25:33 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/io/TextIO.java     | 15 ++++---
 .../org/apache/beam/sdk/transforms/Watch.java   | 42 ++++++++++++++++++++
 2 files changed, 51 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/184f7a9b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index cbc17ff..835008f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.transforms.Watch.Growth.ignoreInput;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
@@ -250,7 +251,7 @@ public class TextIO {
     abstract Duration getWatchForNewFilesInterval();
 
     @Nullable
-    abstract TerminationCondition getWatchForNewFilesTerminationCondition();
+    abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition();
 
     abstract boolean getHintMatchesManyFiles();
     abstract EmptyMatchTreatment getEmptyMatchTreatment();
@@ -262,7 +263,8 @@ public class TextIO {
       abstract Builder setFilepattern(ValueProvider<String> filepattern);
       abstract Builder setCompressionType(CompressionType compressionType);
       abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
-      abstract Builder setWatchForNewFilesTerminationCondition(TerminationCondition condition);
+      abstract Builder setWatchForNewFilesTerminationCondition(
+              TerminationCondition<?, ?> condition);
       abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
       abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
 
@@ -312,7 +314,8 @@ public class TextIO {
      * @see TerminationCondition
      */
     @Experimental(Kind.SPLITTABLE_DO_FN)
-    public Read watchForNewFiles(Duration pollInterval, TerminationCondition terminationCondition) {
+    public Read watchForNewFiles(
+        Duration pollInterval, TerminationCondition<?, ?> terminationCondition) {
       return toBuilder()
           .setWatchForNewFilesInterval(pollInterval)
           .setWatchForNewFilesTerminationCondition(terminationCondition)
@@ -352,9 +355,9 @@ public class TextIO {
               .withCompressionType(getCompressionType())
               .withEmptyMatchTreatment(getEmptyMatchTreatment());
       if (getWatchForNewFilesInterval() != null) {
-        readAll =
-            readAll.watchForNewFiles(
-                getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+        TerminationCondition<String, ?> readAllCondition =
+            ignoreInput(getWatchForNewFilesTerminationCondition());
+        readAll = readAll.watchForNewFiles(getWatchForNewFilesInterval(), readAllCondition);
       }
       return input
           .apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))

http://git-wip-us.apache.org/repos/asf/beam/blob/184f7a9b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index 9da2408..21f0641 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -264,6 +264,15 @@ public class Watch {
     }
 
     /**
+     * Wraps a given input-independent {@link TerminationCondition} as an equivalent condition
+     * with a given input type, passing {@code null} to the original condition as input.
+     */
+    public static <InputT, StateT> TerminationCondition<InputT, StateT> ignoreInput(
+        TerminationCondition<?, StateT> condition) {
+      return new IgnoreInput<>(condition);
+    }
+
+    /**
      * Returns a {@link TerminationCondition} that holds after the given time has elapsed after the
      * current input was seen.
      */
@@ -344,6 +353,39 @@ public class Watch {
       }
     }
 
+    static class IgnoreInput<InputT, StateT> implements TerminationCondition<InputT, StateT> {
+      private final TerminationCondition<?, StateT> wrapped;
+
+      IgnoreInput(TerminationCondition<?, StateT> wrapped) {
+        this.wrapped = wrapped;
+      }
+
+      @Override
+      public Coder<StateT> getStateCoder() {
+        return wrapped.getStateCoder();
+      }
+
+      @Override
+      public StateT forNewInput(Instant now, InputT input) {
+        return wrapped.forNewInput(now, null);
+      }
+
+      @Override
+      public StateT onSeenNewOutput(Instant now, StateT state) {
+        return wrapped.onSeenNewOutput(now, state);
+      }
+
+      @Override
+      public boolean canStopPolling(Instant now, StateT state) {
+        return wrapped.canStopPolling(now, state);
+      }
+
+      @Override
+      public String toString(StateT state) {
+        return wrapped.toString(state);
+      }
+    }
+
     static class AfterTotalOf<InputT>
         implements TerminationCondition<
             InputT, KV<Instant /* timeStarted */, ReadableDuration /* maxTimeSinceInput */>> {