You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/08/30 19:11:37 UTC
[2/6] beam git commit: Gets rid of raw type in
TextIO.Read.watchForNewFiles
Gets rid of raw type in TextIO.Read.watchForNewFiles
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/184f7a9b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/184f7a9b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/184f7a9b
Branch: refs/heads/master
Commit: 184f7a9b31641641cdb4bc7ddcf3556c0514f71b
Parents: d64f2cc
Author: Eugene Kirpichov <ek...@gmail.com>
Authored: Wed Aug 16 14:25:33 2017 -0700
Committer: Eugene Kirpichov <ek...@gmail.com>
Committed: Wed Aug 30 11:55:18 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/TextIO.java | 15 ++++---
.../org/apache/beam/sdk/transforms/Watch.java | 42 ++++++++++++++++++++
2 files changed, 51 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/184f7a9b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index cbc17ff..835008f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.io;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static org.apache.beam.sdk.transforms.Watch.Growth.ignoreInput;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
@@ -250,7 +251,7 @@ public class TextIO {
abstract Duration getWatchForNewFilesInterval();
@Nullable
- abstract TerminationCondition getWatchForNewFilesTerminationCondition();
+ abstract TerminationCondition<?, ?> getWatchForNewFilesTerminationCondition();
abstract boolean getHintMatchesManyFiles();
abstract EmptyMatchTreatment getEmptyMatchTreatment();
@@ -262,7 +263,8 @@ public class TextIO {
abstract Builder setFilepattern(ValueProvider<String> filepattern);
abstract Builder setCompressionType(CompressionType compressionType);
abstract Builder setWatchForNewFilesInterval(Duration watchForNewFilesInterval);
- abstract Builder setWatchForNewFilesTerminationCondition(TerminationCondition condition);
+ abstract Builder setWatchForNewFilesTerminationCondition(
+ TerminationCondition<?, ?> condition);
abstract Builder setHintMatchesManyFiles(boolean hintManyFiles);
abstract Builder setEmptyMatchTreatment(EmptyMatchTreatment treatment);
@@ -312,7 +314,8 @@ public class TextIO {
* @see TerminationCondition
*/
@Experimental(Kind.SPLITTABLE_DO_FN)
- public Read watchForNewFiles(Duration pollInterval, TerminationCondition terminationCondition) {
+ public Read watchForNewFiles(
+ Duration pollInterval, TerminationCondition<?, ?> terminationCondition) {
return toBuilder()
.setWatchForNewFilesInterval(pollInterval)
.setWatchForNewFilesTerminationCondition(terminationCondition)
@@ -352,9 +355,9 @@ public class TextIO {
.withCompressionType(getCompressionType())
.withEmptyMatchTreatment(getEmptyMatchTreatment());
if (getWatchForNewFilesInterval() != null) {
- readAll =
- readAll.watchForNewFiles(
- getWatchForNewFilesInterval(), getWatchForNewFilesTerminationCondition());
+ TerminationCondition<String, ?> readAllCondition =
+ ignoreInput(getWatchForNewFilesTerminationCondition());
+ readAll = readAll.watchForNewFiles(getWatchForNewFilesInterval(), readAllCondition);
}
return input
.apply("Create filepattern", Create.ofProvider(getFilepattern(), StringUtf8Coder.of()))
http://git-wip-us.apache.org/repos/asf/beam/blob/184f7a9b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index 9da2408..21f0641 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -264,6 +264,15 @@ public class Watch {
}
/**
+ * Wraps a given input-independent {@link TerminationCondition} as an equivalent condition
+ * with a given input type, passing {@code null} to the original condition as input.
+ */
+ public static <InputT, StateT> TerminationCondition<InputT, StateT> ignoreInput(
+ TerminationCondition<?, StateT> condition) {
+ return new IgnoreInput<>(condition);
+ }
+
+ /**
* Returns a {@link TerminationCondition} that holds after the given time has elapsed after the
* current input was seen.
*/
@@ -344,6 +353,39 @@ public class Watch {
}
}
+ static class IgnoreInput<InputT, StateT> implements TerminationCondition<InputT, StateT> {
+ private final TerminationCondition<?, StateT> wrapped;
+
+ IgnoreInput(TerminationCondition<?, StateT> wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public Coder<StateT> getStateCoder() {
+ return wrapped.getStateCoder();
+ }
+
+ @Override
+ public StateT forNewInput(Instant now, InputT input) {
+ return wrapped.forNewInput(now, null);
+ }
+
+ @Override
+ public StateT onSeenNewOutput(Instant now, StateT state) {
+ return wrapped.onSeenNewOutput(now, state);
+ }
+
+ @Override
+ public boolean canStopPolling(Instant now, StateT state) {
+ return wrapped.canStopPolling(now, state);
+ }
+
+ @Override
+ public String toString(StateT state) {
+ return wrapped.toString(state);
+ }
+ }
+
static class AfterTotalOf<InputT>
implements TerminationCondition<
InputT, KV<Instant /* timeStarted */, ReadableDuration /* maxTimeSinceInput */>> {