You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/06 00:36:00 UTC

[jira] [Commented] (BEAM-3030) watchForNewFiles() can emit a file multiple times if it's growing

    [ https://issues.apache.org/jira/browse/BEAM-3030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16279443#comment-16279443 ] 

ASF GitHub Bot commented on BEAM-3030:
--------------------------------------

asfgit closed pull request #4190: [BEAM-3030] Adds a deduplication key to Watch, and uses it to handle growing files in FileIO.match
URL: https://github.com/apache/beam/pull/4190
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
index a244c070129..4e7124af8a5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java
@@ -33,13 +33,17 @@
 import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
 import org.apache.beam.sdk.io.fs.MatchResult;
 import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.transforms.Contextful;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Requirements;
 import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.Watch;
+import org.apache.beam.sdk.transforms.Watch.Growth.PollFn;
 import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
@@ -69,6 +73,11 @@
    * <p>By default, a filepattern matching no resources is treated according to {@link
    * EmptyMatchTreatment#DISALLOW}. To configure this behavior, use {@link
    * Match#withEmptyMatchTreatment}.
+   *
+   * <p>Returned {@link MatchResult.Metadata} are deduplicated by filename. For example, if this
+   * transform observes a file with the same name several times with different metadata (e.g.
+   * because the file is growing), it will emit the metadata the first time this file is observed,
+   * and will ignore future changes to this file.
    */
   public static Match match() {
     return new AutoValue_FileIO_Match.Builder()
@@ -317,13 +326,17 @@ public MatchAll continuously(
             "Match filepatterns",
             ParDo.of(new MatchFn(getConfiguration().getEmptyMatchTreatment())));
       } else {
-        res = input
-            .apply(
-                "Continuously match filepatterns",
-                Watch.growthOf(new MatchPollFn())
-                    .withPollInterval(getConfiguration().getWatchInterval())
-                    .withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
-            .apply(Values.<MatchResult.Metadata>create());
+        res =
+            input
+                .apply(
+                    "Continuously match filepatterns",
+                    Watch.growthOf(
+                            Contextful.<PollFn<String, MatchResult.Metadata>>of(
+                                new MatchPollFn(), Requirements.empty()),
+                            new ExtractFilenameFn())
+                        .withPollInterval(getConfiguration().getWatchInterval())
+                        .withTerminationPerInput(getConfiguration().getWatchTerminationCondition()))
+                .apply(Values.<MatchResult.Metadata>create());
       }
       return res.apply(Reshuffle.<MatchResult.Metadata>viaRandomKey());
     }
@@ -346,7 +359,7 @@ public void process(ProcessContext c) throws Exception {
       }
     }
 
-    private static class MatchPollFn extends Watch.Growth.PollFn<String, MatchResult.Metadata> {
+    private static class MatchPollFn extends PollFn<String, MatchResult.Metadata> {
       @Override
       public Watch.Growth.PollResult<MatchResult.Metadata> apply(String element, Context c)
           throws Exception {
@@ -354,6 +367,14 @@ public void process(ProcessContext c) throws Exception {
             Instant.now(), FileSystems.match(element, EmptyMatchTreatment.ALLOW).metadata());
       }
     }
+
+    private static class ExtractFilenameFn
+        implements SerializableFunction<MatchResult.Metadata, String> {
+      @Override
+      public String apply(MatchResult.Metadata input) {
+        return input.resourceId().toString();
+      }
+    }
   }
 
   /** Implementation of {@link #readMatches}. */
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
index 75c2fe45b80..4b31ae71333 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Watch.java
@@ -58,6 +58,7 @@
 import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.coders.StructuredCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.Contextful.Fn;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.values.KV;
@@ -117,29 +118,46 @@
   private static final Logger LOG = LoggerFactory.getLogger(Watch.class);
 
   /** Watches the growth of the given poll function. See class documentation for more details. */
-  public static <InputT, OutputT> Growth<InputT, OutputT> growthOf(
-      Contextful<Growth.PollFn<InputT, OutputT>> pollFn) {
-    return new AutoValue_Watch_Growth.Builder<InputT, OutputT>()
-        .setTerminationPerInput(Watch.Growth.<InputT>never())
-        .setPollFn(pollFn)
-        .build();
-  }
-
-  /** Watches the growth of the given poll function. See class documentation for more details. */
-  public static <InputT, OutputT> Growth<InputT, OutputT> growthOf(
+  public static <InputT, OutputT> Growth<InputT, OutputT, OutputT> growthOf(
       Growth.PollFn<InputT, OutputT> pollFn, Requirements requirements) {
-    return growthOf(Contextful.of(pollFn, requirements));
+    return new AutoValue_Watch_Growth.Builder<InputT, OutputT, OutputT>()
+        .setTerminationPerInput(Growth.<InputT>never())
+        .setPollFn(Contextful.of(pollFn, requirements))
+        // use null as a signal that this is the identity function and output coder can be
+        // reused as key coder
+        .setOutputKeyFn(null)
+        .build();
   }
 
   /** Watches the growth of the given poll function. See class documentation for more details. */
-  public static <InputT, OutputT> Growth<InputT, OutputT> growthOf(
+  public static <InputT, OutputT> Growth<InputT, OutputT, OutputT> growthOf(
       Growth.PollFn<InputT, OutputT> pollFn) {
     return growthOf(pollFn, Requirements.empty());
   }
 
+  /**
+   * Watches the growth of the given poll function, using the given "key function" to deduplicate
+   * outputs. For example, if OutputT is a filename + file size, this can be a function that returns
+   * just the filename, so that if the same file is observed multiple times with different sizes,
+   * only the first observation is emitted.
+   *
+   * <p>By default, this is the identity function, i.e. the output is used as its own key.
+   */
+  public static <InputT, OutputT, KeyT> Growth<InputT, OutputT, KeyT> growthOf(
+      Contextful<Growth.PollFn<InputT, OutputT>> pollFn,
+      SerializableFunction<OutputT, KeyT> outputKeyFn) {
+    checkArgument(pollFn != null, "pollFn can not be null");
+    checkArgument(outputKeyFn != null, "outputKeyFn can not be null");
+    return new AutoValue_Watch_Growth.Builder<InputT, OutputT, KeyT>()
+        .setTerminationPerInput(Watch.Growth.<InputT>never())
+        .setPollFn(pollFn)
+        .setOutputKeyFn(outputKeyFn)
+        .build();
+  }
+
   /** Implementation of {@link #growthOf}. */
   @AutoValue
-  public abstract static class Growth<InputT, OutputT>
+  public abstract static class Growth<InputT, OutputT, KeyT>
       extends PTransform<PCollection<InputT>, PCollection<KV<InputT, OutputT>>> {
     /** The result of a single invocation of a {@link PollFn}. */
     public static final class PollResult<OutputT> {
@@ -219,7 +237,7 @@ Instant getWatermark() {
      * {@link PollResult}.
      */
     public abstract static class PollFn<InputT, OutputT>
-        implements Contextful.Fn<InputT, PollResult<OutputT>> {}
+        implements Fn<InputT, PollResult<OutputT>> {}
 
     /**
      * A strategy for determining whether it is time to stop polling the current input regardless of
@@ -550,6 +568,12 @@ public String toString(KV<FirstStateT, SecondStateT> state) {
 
     abstract Contextful<PollFn<InputT, OutputT>> getPollFn();
 
+    @Nullable
+    abstract SerializableFunction<OutputT, KeyT> getOutputKeyFn();
+
+    @Nullable
+    abstract Coder<KeyT> getOutputKeyCoder();
+
     @Nullable
     abstract Duration getPollInterval();
 
@@ -559,24 +583,34 @@ public String toString(KV<FirstStateT, SecondStateT> state) {
     @Nullable
     abstract Coder<OutputT> getOutputCoder();
 
-    abstract Builder<InputT, OutputT> toBuilder();
+    abstract Builder<InputT, OutputT, KeyT> toBuilder();
 
     @AutoValue.Builder
-    abstract static class Builder<InputT, OutputT> {
-      abstract Builder<InputT, OutputT> setPollFn(Contextful<PollFn<InputT, OutputT>> pollFn);
+    abstract static class Builder<InputT, OutputT, KeyT> {
+      abstract Builder<InputT, OutputT, KeyT> setPollFn(Contextful<PollFn<InputT, OutputT>> pollFn);
+
+      abstract Builder<InputT, OutputT, KeyT> setOutputKeyFn(
+          @Nullable SerializableFunction<OutputT, KeyT> outputKeyFn);
 
-      abstract Builder<InputT, OutputT> setTerminationPerInput(
+      abstract Builder<InputT, OutputT, KeyT> setOutputKeyCoder(Coder<KeyT> outputKeyCoder);
+
+      abstract Builder<InputT, OutputT, KeyT> setTerminationPerInput(
           TerminationCondition<InputT, ?> terminationPerInput);
 
-      abstract Builder<InputT, OutputT> setPollInterval(Duration pollInterval);
+      abstract Builder<InputT, OutputT, KeyT> setPollInterval(Duration pollInterval);
+
+      abstract Builder<InputT, OutputT, KeyT> setOutputCoder(Coder<OutputT> outputCoder);
 
-      abstract Builder<InputT, OutputT> setOutputCoder(Coder<OutputT> outputCoder);
+      abstract Growth<InputT, OutputT, KeyT> build();
+    }
 
-      abstract Growth<InputT, OutputT> build();
+    /** Specifies the coder for the output key. */
+    public Growth<InputT, OutputT, KeyT> withOutputKeyCoder(Coder<KeyT> outputKeyCoder) {
+      return toBuilder().setOutputKeyCoder(outputKeyCoder).build();
     }
 
     /** Specifies a {@link TerminationCondition} that will be independently used for every input. */
-    public Growth<InputT, OutputT> withTerminationPerInput(
+    public Growth<InputT, OutputT, KeyT> withTerminationPerInput(
         TerminationCondition<InputT, ?> terminationPerInput) {
       return toBuilder().setTerminationPerInput(terminationPerInput).build();
     }
@@ -585,7 +619,7 @@ public String toString(KV<FirstStateT, SecondStateT> state) {
      * Specifies how long to wait after a call to {@link PollFn} before calling it again (if at all
      * - according to {@link PollResult} and the {@link TerminationCondition}).
      */
-    public Growth<InputT, OutputT> withPollInterval(Duration pollInterval) {
+    public Growth<InputT, OutputT, KeyT> withPollInterval(Duration pollInterval) {
       return toBuilder().setPollInterval(pollInterval).build();
     }
 
@@ -596,7 +630,7 @@ public String toString(KV<FirstStateT, SecondStateT> state) {
      * <p>The coder must be deterministic, because the transform will compare encoded outputs for
      * deduplication between polling rounds.
      */
-    public Growth<InputT, OutputT> withOutputCoder(Coder<OutputT> outputCoder) {
+    public Growth<InputT, OutputT, KeyT> withOutputCoder(Coder<OutputT> outputCoder) {
       return toBuilder().setOutputCoder(outputCoder).build();
     }
 
@@ -618,36 +652,68 @@ public String toString(KV<FirstStateT, SecondStateT> state) {
           outputCoder = input.getPipeline().getCoderRegistry().getCoder(outputT);
         } catch (CannotProvideCoderException e) {
           throw new RuntimeException(
-              "Unable to infer coder for OutputT. Specify it explicitly using withOutputCoder().");
+              "Unable to infer coder for OutputT ("
+                  + outputT
+                  + "). Specify it explicitly using withOutputCoder().");
         }
       }
-      try {
-        outputCoder.verifyDeterministic();
-      } catch (Coder.NonDeterministicException e) {
-        throw new IllegalArgumentException(
-            "Output coder " + outputCoder + " must be deterministic");
+
+      Coder<KeyT> outputKeyCoder = getOutputKeyCoder();
+      SerializableFunction<OutputT, KeyT> outputKeyFn = getOutputKeyFn();
+      if (getOutputKeyFn() == null) {
+        // This by construction can happen only if OutputT == KeyT
+        outputKeyCoder = (Coder) outputCoder;
+        outputKeyFn = (SerializableFunction) SerializableFunctions.identity();
+      } else {
+        if (outputKeyCoder == null) {
+          // If a coder was not specified explicitly, infer it from the OutputT type parameter
+          // of the output key fn.
+          TypeDescriptor<KeyT> keyT = TypeDescriptors.outputOf(getOutputKeyFn());
+          try {
+            outputKeyCoder = input.getPipeline().getCoderRegistry().getCoder(keyT);
+          } catch (CannotProvideCoderException e) {
+            throw new RuntimeException(
+                "Unable to infer coder for KeyT ("
+                    + keyT
+                    + "). Specify it explicitly using withOutputKeyCoder().");
+          }
+        }
+        try {
+          outputKeyCoder.verifyDeterministic();
+        } catch (Coder.NonDeterministicException e) {
+          throw new IllegalArgumentException(
+              "Key coder " + outputKeyCoder + " must be deterministic");
+        }
       }
 
       return input
-          .apply(ParDo.of(new WatchGrowthFn<>(this, outputCoder))
+          .apply(ParDo.of(new WatchGrowthFn<>(this, outputCoder, outputKeyFn, outputKeyCoder))
           .withSideInputs(getPollFn().getRequirements().getSideInputs()))
           .setCoder(KvCoder.of(input.getCoder(), outputCoder));
     }
   }
 
-  private static class WatchGrowthFn<InputT, OutputT, TerminationStateT>
+  private static class WatchGrowthFn<InputT, OutputT, KeyT, TerminationStateT>
       extends DoFn<InputT, KV<InputT, OutputT>> {
-    private final Watch.Growth<InputT, OutputT> spec;
+    private final Watch.Growth<InputT, OutputT, KeyT> spec;
     private final Coder<OutputT> outputCoder;
-
-    private WatchGrowthFn(Growth<InputT, OutputT> spec, Coder<OutputT> outputCoder) {
+    private final SerializableFunction<OutputT, KeyT> outputKeyFn;
+    private final Coder<KeyT> outputKeyCoder;
+
+    private WatchGrowthFn(
+        Growth<InputT, OutputT, KeyT> spec,
+        Coder<OutputT> outputCoder,
+        SerializableFunction<OutputT, KeyT> outputKeyFn,
+        Coder<KeyT> outputKeyCoder) {
       this.spec = spec;
       this.outputCoder = outputCoder;
+      this.outputKeyFn = outputKeyFn;
+      this.outputKeyCoder = outputKeyCoder;
     }
 
     @ProcessElement
     public ProcessContinuation process(
-        ProcessContext c, final GrowthTracker<OutputT, TerminationStateT> tracker)
+        ProcessContext c, final GrowthTracker<OutputT, KeyT, TerminationStateT> tracker)
         throws Exception {
       if (!tracker.hasPending() && !tracker.currentRestriction().isOutputComplete) {
         LOG.debug("{} - polling input", c.element());
@@ -700,26 +766,27 @@ public ProcessContinuation process(
     }
 
     @GetInitialRestriction
-    public GrowthState<OutputT, TerminationStateT> getInitialRestriction(InputT element) {
+    public GrowthState<OutputT, KeyT, TerminationStateT> getInitialRestriction(InputT element) {
       return new GrowthState<>(getTerminationCondition().forNewInput(Instant.now(), element));
     }
 
     @NewTracker
-    public GrowthTracker<OutputT, TerminationStateT> newTracker(
-        GrowthState<OutputT, TerminationStateT> restriction) {
-      return new GrowthTracker<>(outputCoder, restriction, getTerminationCondition());
+    public GrowthTracker<OutputT, KeyT, TerminationStateT> newTracker(
+        GrowthState<OutputT, KeyT, TerminationStateT> restriction) {
+      return new GrowthTracker<>(
+          outputKeyFn, outputKeyCoder, restriction, getTerminationCondition());
     }
 
     @GetRestrictionCoder
     @SuppressWarnings({"unchecked", "rawtypes"})
-    public Coder<GrowthState<OutputT, TerminationStateT>> getRestrictionCoder() {
+    public Coder<GrowthState<OutputT, KeyT, TerminationStateT>> getRestrictionCoder() {
       return GrowthStateCoder.of(
           outputCoder, (Coder) spec.getTerminationPerInput().getStateCoder());
     }
   }
 
   @VisibleForTesting
-  static class GrowthState<OutputT, TerminationStateT> {
+  static class GrowthState<OutputT, KeyT, TerminationStateT> {
     // Hashes and timestamps of outputs that have already been output and should be omitted
     // from future polls. Timestamps are preserved to allow garbage-collecting this state
     // in the future, e.g. dropping elements from "completed" and from addNewAsPending() if their
@@ -781,14 +848,14 @@ public String toString(Growth.TerminationCondition<?, TerminationStateT> termina
   }
 
   @VisibleForTesting
-  static class GrowthTracker<OutputT, TerminationStateT>
-      implements RestrictionTracker<GrowthState<OutputT, TerminationStateT>> {
+  static class GrowthTracker<OutputT, KeyT, TerminationStateT>
+      implements RestrictionTracker<GrowthState<OutputT, KeyT, TerminationStateT>> {
     private final Funnel<OutputT> coderFunnel;
     private final Growth.TerminationCondition<?, TerminationStateT> terminationCondition;
 
     // The restriction describing the entire work to be done by the current ProcessElement call.
     // Changes only in checkpoint().
-    private GrowthState<OutputT, TerminationStateT> state;
+    private GrowthState<OutputT, KeyT, TerminationStateT> state;
 
     // Mutable state changed by the ProcessElement call itself, and used to compute the primary
     // and residual restrictions in checkpoint().
@@ -803,14 +870,19 @@ public String toString(Growth.TerminationCondition<?, TerminationStateT> termina
     @Nullable private Instant pollWatermark;
     private boolean shouldStop = false;
 
-    GrowthTracker(final Coder<OutputT> outputCoder, GrowthState<OutputT, TerminationStateT> state,
-                  Growth.TerminationCondition<?, TerminationStateT> terminationCondition) {
+    GrowthTracker(
+        final SerializableFunction<OutputT, KeyT> keyFn,
+        final Coder<KeyT> outputKeyCoder,
+        GrowthState<OutputT, KeyT, TerminationStateT> state,
+        Growth.TerminationCondition<?, TerminationStateT> terminationCondition) {
       this.coderFunnel =
           new Funnel<OutputT>() {
             @Override
             public void funnel(OutputT from, PrimitiveSink into) {
               try {
-                outputCoder.encode(from, Funnels.asOutputStream(into));
+                // Rather than hashing the output itself, hash the output key.
+                KeyT outputKey = keyFn.apply(from);
+                outputKeyCoder.encode(outputKey, Funnels.asOutputStream(into));
               } catch (IOException e) {
                 throw new RuntimeException(e);
               }
@@ -825,15 +897,15 @@ public void funnel(OutputT from, PrimitiveSink into) {
     }
 
     @Override
-    public synchronized GrowthState<OutputT, TerminationStateT> currentRestriction() {
+    public synchronized GrowthState<OutputT, KeyT, TerminationStateT> currentRestriction() {
       return state;
     }
 
     @Override
-    public synchronized GrowthState<OutputT, TerminationStateT> checkpoint() {
+    public synchronized GrowthState<OutputT, KeyT, TerminationStateT> checkpoint() {
       // primary should contain exactly the work claimed in the current ProcessElement call - i.e.
       // claimed outputs become pending, and it shouldn't poll again.
-      GrowthState<OutputT, TerminationStateT> primary =
+      GrowthState<OutputT, KeyT, TerminationStateT> primary =
           new GrowthState<>(
               state.completed /* completed */,
               claimed /* pending */,
@@ -845,9 +917,10 @@ public void funnel(OutputT from, PrimitiveSink into) {
       // unclaimed pending outputs plus future polling outputs.
       Map<HashCode, Instant> newCompleted = Maps.newHashMap(state.completed);
       for (TimestampedValue<OutputT> claimedOutput : claimed) {
-        newCompleted.put(hash128(claimedOutput.getValue()), claimedOutput.getTimestamp());
+        newCompleted.put(
+            hash128(claimedOutput.getValue()), claimedOutput.getTimestamp());
       }
-      GrowthState<OutputT, TerminationStateT> residual =
+      GrowthState<OutputT, KeyT, TerminationStateT> residual =
           new GrowthState<>(
               newCompleted /* completed */,
               pending /* pending */,
@@ -910,10 +983,14 @@ synchronized int addNewAsPending(Growth.PollResult<OutputT> pollResult) {
           "Should have drained all old pending outputs before adding new, "
               + "but there are %s old pending outputs",
           state.pending.size());
-      List<TimestampedValue<OutputT>> newPending = Lists.newArrayList();
+      // Collect results to include as newly pending. Note that the poll result may in theory
+      // contain multiple outputs mapping to the the same output key - we need to ignore duplicates
+      // here already.
+      Map<HashCode, TimestampedValue<OutputT>> newPending = Maps.newHashMap();
       for (TimestampedValue<OutputT> output : pollResult.getOutputs()) {
         OutputT value = output.getValue();
-        if (state.completed.containsKey(hash128(value))) {
+        HashCode hash = hash128(value);
+        if (state.completed.containsKey(hash) || newPending.containsKey(hash)) {
           continue;
         }
         // TODO (https://issues.apache.org/jira/browse/BEAM-2680):
@@ -921,7 +998,7 @@ synchronized int addNewAsPending(Growth.PollResult<OutputT> pollResult) {
         // instead relying on future poll rounds to provide them, in order to avoid
         // blowing up the state. Combined with garbage collection of GrowthState.completed,
         // this would make the transform scalable to very large poll results.
-        newPending.add(TimestampedValue.of(value, output.getTimestamp()));
+        newPending.put(hash, TimestampedValue.of(value, output.getTimestamp()));
       }
       if (!newPending.isEmpty()) {
         terminationState = terminationCondition.onSeenNewOutput(Instant.now(), terminationState);
@@ -936,7 +1013,7 @@ public Instant apply(TimestampedValue<OutputT> output) {
                           return output.getTimestamp();
                         }
                       })
-                  .sortedCopy(newPending));
+                  .sortedCopy(newPending.values()));
       // If poll result doesn't provide a watermark, assume that future new outputs may
       // arrive with about the same timestamps as the current new outputs.
       if (pollResult.getWatermark() != null) {
@@ -1008,10 +1085,11 @@ public HashCode decode(InputStream is) throws IOException {
     }
   }
 
-  private static class GrowthStateCoder<OutputT, TerminationStateT>
-      extends StructuredCoder<GrowthState<OutputT, TerminationStateT>> {
-    public static <OutputT, TerminationStateT> GrowthStateCoder<OutputT, TerminationStateT> of(
-        Coder<OutputT> outputCoder, Coder<TerminationStateT> terminationStateCoder) {
+  private static class GrowthStateCoder<OutputT, KeyT, TerminationStateT>
+      extends StructuredCoder<GrowthState<OutputT, KeyT, TerminationStateT>> {
+    public static <OutputT, KeyT, TerminationStateT>
+        GrowthStateCoder<OutputT, KeyT, TerminationStateT> of(
+            Coder<OutputT> outputCoder, Coder<TerminationStateT> terminationStateCoder) {
       return new GrowthStateCoder<>(outputCoder, terminationStateCoder);
     }
 
@@ -1033,7 +1111,7 @@ private GrowthStateCoder(
     }
 
     @Override
-    public void encode(GrowthState<OutputT, TerminationStateT> value, OutputStream os)
+    public void encode(GrowthState<OutputT, KeyT, TerminationStateT> value, OutputStream os)
         throws IOException {
       completedCoder.encode(value.completed, os);
       pendingCoder.encode(value.pending, os);
@@ -1043,7 +1121,7 @@ public void encode(GrowthState<OutputT, TerminationStateT> value, OutputStream o
     }
 
     @Override
-    public GrowthState<OutputT, TerminationStateT> decode(InputStream is) throws IOException {
+    public GrowthState<OutputT, KeyT, TerminationStateT> decode(InputStream is) throws IOException {
       Map<HashCode, Instant> completed = completedCoder.decode(is);
       List<TimestampedValue<OutputT>> pending = pendingCoder.decode(is);
       boolean isOutputComplete = BOOLEAN_CODER.decode(is);
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
index 89043766cda..ec6880cd588 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/WatchTest.java
@@ -173,6 +173,60 @@ private void testMultiplePolls(boolean terminationConditionElapsesBeforeOutputIs
     p.run();
   }
 
+  @Test
+  @Category({NeedsRunner.class, UsesSplittableParDo.class})
+  public void testMultiplePollsWithKeyExtractor() {
+    List<KV<Integer, String>> polls =
+        Arrays.asList(
+            KV.of(0, "0"),
+            KV.of(10, "10"),
+            KV.of(20, "20"),
+            KV.of(30, "30"),
+            KV.of(40, "40"),
+            KV.of(40, "40.1"),
+            KV.of(20, "20.1"),
+            KV.of(50, "50"),
+            KV.of(10, "10.1"),
+            KV.of(10, "10.2"),
+            KV.of(60, "60"),
+            KV.of(70, "70"),
+            KV.of(60, "60.1"),
+            KV.of(80, "80"),
+            KV.of(40, "40.2"),
+            KV.of(90, "90"),
+            KV.of(90, "90.1"));
+
+    List<Integer> expected = Arrays.asList(0, 10, 20, 30, 40, 50, 60, 70, 80, 90);
+
+    PCollection<Integer> res =
+        p.apply(Create.of("a"))
+            .apply(
+                Watch.growthOf(
+                        Contextful.<PollFn<String, KV<Integer, String>>>of(
+                            new TimedPollFn<String, KV<Integer, String>>(
+                                polls,
+                                standardSeconds(1) /* timeToOutputEverything */,
+                                standardSeconds(3) /* timeToDeclareOutputFinal */,
+                                standardSeconds(30) /* timeToFail */),
+                            Requirements.empty()),
+                        new SerializableFunction<KV<Integer, String>, Integer>() {
+                          @Override
+                          public Integer apply(KV<Integer, String> input) {
+                            return input.getKey();
+                          }
+                        })
+                    .withTerminationPerInput(Watch.Growth.<String>afterTotalOf(standardSeconds(5)))
+                    .withPollInterval(Duration.millis(100))
+                    .withOutputCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of())))
+            .apply("Drop input", Values.<KV<Integer, String>>create())
+            .apply("Drop auxiliary string", Keys.<Integer>create());
+
+    PAssert.that(res).containsInAnyOrder(expected);
+
+    p.run();
+  }
+
+
   @Test
   @Category({NeedsRunner.class, UsesSplittableParDo.class})
   public void testMultiplePollsStopAfterTimeSinceNewOutput() {
@@ -437,20 +491,23 @@ public void testTerminationConditionsAllOf() {
     assertTrue(c.canStopPolling(now.plus(standardSeconds(12)), state));
   }
 
-  private static GrowthTracker<String, Integer> newTracker(GrowthState<String, Integer> state) {
-    return new GrowthTracker<>(StringUtf8Coder.of(), state, never());
+  private static GrowthTracker<String, String, Integer> newTracker(
+      GrowthState<String, String, Integer> state) {
+    return new GrowthTracker<>(
+        SerializableFunctions.<String>identity(), StringUtf8Coder.of(), state, never());
   }
 
-  private static GrowthTracker<String, Integer> newTracker() {
-    return newTracker(new GrowthState<String, Integer>(never().forNewInput(Instant.now(), null)));
+  private static GrowthTracker<String, String, Integer> newTracker() {
+    return newTracker(
+        new GrowthState<String, String, Integer>(never().forNewInput(Instant.now(), null)));
   }
 
   @Test
   public void testGrowthTrackerCheckpointEmpty() {
     // Checkpoint an empty tracker.
-    GrowthTracker<String, Integer> tracker = newTracker();
-    GrowthState<String, Integer> residual = tracker.checkpoint();
-    GrowthState<String, Integer> primary = tracker.currentRestriction();
+    GrowthTracker<String, String, Integer> tracker = newTracker();
+    GrowthState<String, String, Integer> residual = tracker.checkpoint();
+    GrowthState<String, String, Integer> primary = tracker.currentRestriction();
     Watch.Growth.Never<String> condition = never();
     assertEquals(
         primary.toString(condition),
@@ -475,7 +532,7 @@ public void testGrowthTrackerCheckpointEmpty() {
   @Test
   public void testGrowthTrackerCheckpointNonEmpty() {
     Instant now = Instant.now();
-    GrowthTracker<String, Integer> tracker = newTracker();
+    GrowthTracker<String, String, Integer> tracker = newTracker();
     tracker.addNewAsPending(
         PollResult.incomplete(
                 Arrays.asList(
@@ -493,8 +550,9 @@ public void testGrowthTrackerCheckpointNonEmpty() {
     assertTrue(tracker.hasPending());
     assertEquals(now.plus(standardSeconds(3)), tracker.getWatermark());
 
-    GrowthTracker<String, Integer> residualTracker = newTracker(tracker.checkpoint());
-    GrowthTracker<String, Integer> primaryTracker = newTracker(tracker.currentRestriction());
+    GrowthTracker<String, String, Integer> residualTracker = newTracker(tracker.checkpoint());
+    GrowthTracker<String, String, Integer> primaryTracker =
+        newTracker(tracker.currentRestriction());
 
     // Verify primary: should contain what the current tracker claimed, and nothing else.
     assertEquals(now.plus(standardSeconds(1)), primaryTracker.getWatermark());
@@ -530,7 +588,7 @@ public void testGrowthTrackerCheckpointNonEmpty() {
   @Test
   public void testGrowthTrackerOutputFullyBeforeCheckpointIncomplete() {
     Instant now = Instant.now();
-    GrowthTracker<String, Integer> tracker = newTracker();
+    GrowthTracker<String, String, Integer> tracker = newTracker();
     tracker.addNewAsPending(
         PollResult.incomplete(
                 Arrays.asList(
@@ -547,8 +605,9 @@ public void testGrowthTrackerOutputFullyBeforeCheckpointIncomplete() {
     assertFalse(tracker.hasPending());
     assertEquals(now.plus(standardSeconds(7)), tracker.getWatermark());
 
-    GrowthTracker<String, Integer> residualTracker = newTracker(tracker.checkpoint());
-    GrowthTracker<String, Integer> primaryTracker = newTracker(tracker.currentRestriction());
+    GrowthTracker<String, String, Integer> residualTracker = newTracker(tracker.checkpoint());
+    GrowthTracker<String, String, Integer> primaryTracker =
+        newTracker(tracker.currentRestriction());
 
     // Verify primary: should contain what the current tracker claimed, and nothing else.
     assertEquals(now.plus(standardSeconds(1)), primaryTracker.getWatermark());
@@ -582,7 +641,7 @@ public void testGrowthTrackerOutputFullyBeforeCheckpointIncomplete() {
   @Test
   public void testGrowthTrackerPollAfterCheckpointIncompleteWithNewOutputs() {
     Instant now = Instant.now();
-    GrowthTracker<String, Integer> tracker = newTracker();
+    GrowthTracker<String, String, Integer> tracker = newTracker();
     tracker.addNewAsPending(
         PollResult.incomplete(
                 Arrays.asList(
@@ -597,10 +656,10 @@ public void testGrowthTrackerPollAfterCheckpointIncompleteWithNewOutputs() {
     assertEquals("c", tracker.tryClaimNextPending().getValue());
     assertEquals("d", tracker.tryClaimNextPending().getValue());
 
-    GrowthState<String, Integer> checkpoint = tracker.checkpoint();
+    GrowthState<String, String, Integer> checkpoint = tracker.checkpoint();
     // Simulate resuming from the checkpoint and adding more elements.
     {
-      GrowthTracker<String, Integer> residualTracker = newTracker(checkpoint);
+      GrowthTracker<String, String, Integer> residualTracker = newTracker(checkpoint);
       residualTracker.addNewAsPending(
           PollResult.incomplete(
                   Arrays.asList(
@@ -623,7 +682,7 @@ public void testGrowthTrackerPollAfterCheckpointIncompleteWithNewOutputs() {
     }
     // Try same without an explicitly specified watermark.
     {
-      GrowthTracker<String, Integer> residualTracker = newTracker(checkpoint);
+      GrowthTracker<String, String, Integer> residualTracker = newTracker(checkpoint);
       residualTracker.addNewAsPending(
           PollResult.incomplete(
               Arrays.asList(
@@ -648,7 +707,7 @@ public void testGrowthTrackerPollAfterCheckpointIncompleteWithNewOutputs() {
   @Test
   public void testGrowthTrackerPollAfterCheckpointWithoutNewOutputs() {
     Instant now = Instant.now();
-    GrowthTracker<String, Integer> tracker = newTracker();
+    GrowthTracker<String, String, Integer> tracker = newTracker();
     tracker.addNewAsPending(
         PollResult.incomplete(
                 Arrays.asList(
@@ -664,9 +723,9 @@ public void testGrowthTrackerPollAfterCheckpointWithoutNewOutputs() {
     assertEquals("d", tracker.tryClaimNextPending().getValue());
 
     // Simulate resuming from the checkpoint but there are no new elements.
-    GrowthState<String, Integer> checkpoint = tracker.checkpoint();
+    GrowthState<String, String, Integer> checkpoint = tracker.checkpoint();
     {
-      GrowthTracker<String, Integer> residualTracker = newTracker(checkpoint);
+      GrowthTracker<String, String, Integer> residualTracker = newTracker(checkpoint);
       residualTracker.addNewAsPending(
           PollResult.incomplete(
                   Arrays.asList(
@@ -682,7 +741,7 @@ public void testGrowthTrackerPollAfterCheckpointWithoutNewOutputs() {
     }
     // Try the same without an explicitly specified watermark
     {
-      GrowthTracker<String, Integer> residualTracker = newTracker(checkpoint);
+      GrowthTracker<String, String, Integer> residualTracker = newTracker(checkpoint);
       residualTracker.addNewAsPending(
           PollResult.incomplete(
               Arrays.asList(
@@ -698,7 +757,7 @@ public void testGrowthTrackerPollAfterCheckpointWithoutNewOutputs() {
   @Test
   public void testGrowthTrackerPollAfterCheckpointWithoutNewOutputsNoWatermark() {
     Instant now = Instant.now();
-    GrowthTracker<String, Integer> tracker = newTracker();
+    GrowthTracker<String, String, Integer> tracker = newTracker();
     tracker.addNewAsPending(
         PollResult.incomplete(
             Arrays.asList(
@@ -713,8 +772,8 @@ public void testGrowthTrackerPollAfterCheckpointWithoutNewOutputsNoWatermark() {
     assertEquals(now.plus(standardSeconds(1)), tracker.getWatermark());
 
     // Simulate resuming from the checkpoint but there are no new elements.
-    GrowthState<String, Integer> checkpoint = tracker.checkpoint();
-    GrowthTracker<String, Integer> residualTracker = newTracker(checkpoint);
+    GrowthState<String, String, Integer> checkpoint = tracker.checkpoint();
+    GrowthTracker<String, String, Integer> residualTracker = newTracker(checkpoint);
     residualTracker.addNewAsPending(
         PollResult.incomplete(
             Arrays.asList(
@@ -730,13 +789,13 @@ public void testGrowthTrackerPollAfterCheckpointWithoutNewOutputsNoWatermark() {
   public void testGrowthTrackerRepeatedEmptyPollWatermark() {
     // Empty poll result with no watermark
     {
-      GrowthTracker<String, Integer> tracker = newTracker();
+      GrowthTracker<String, String, Integer> tracker = newTracker();
       tracker.addNewAsPending(
           PollResult.incomplete(Collections.<TimestampedValue<String>>emptyList()));
       assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, tracker.getWatermark());
 
       // Simulate resuming from the checkpoint but there are still no new elements.
-      GrowthTracker<String, Integer> residualTracker = newTracker(tracker.checkpoint());
+      GrowthTracker<String, String, Integer> residualTracker = newTracker(tracker.checkpoint());
       tracker.addNewAsPending(
           PollResult.incomplete(Collections.<TimestampedValue<String>>emptyList()));
       // No new elements and no explicit watermark supplied - still no watermark.
@@ -745,14 +804,14 @@ public void testGrowthTrackerRepeatedEmptyPollWatermark() {
     // Empty poll result with watermark
     {
       Instant now = Instant.now();
-      GrowthTracker<String, Integer> tracker = newTracker();
+      GrowthTracker<String, String, Integer> tracker = newTracker();
       tracker.addNewAsPending(
           PollResult.incomplete(Collections.<TimestampedValue<String>>emptyList())
               .withWatermark(now));
       assertEquals(now, tracker.getWatermark());
 
       // Simulate resuming from the checkpoint but there are still no new elements.
-      GrowthTracker<String, Integer> residualTracker = newTracker(tracker.checkpoint());
+      GrowthTracker<String, String, Integer> residualTracker = newTracker(tracker.checkpoint());
       tracker.addNewAsPending(
           PollResult.incomplete(Collections.<TimestampedValue<String>>emptyList()));
       // No new elements and no explicit watermark supplied - should keep old watermark.
@@ -763,7 +822,7 @@ public void testGrowthTrackerRepeatedEmptyPollWatermark() {
   @Test
   public void testGrowthTrackerOutputFullyBeforeCheckpointComplete() {
     Instant now = Instant.now();
-    GrowthTracker<String, Integer> tracker = newTracker();
+    GrowthTracker<String, String, Integer> tracker = newTracker();
     tracker.addNewAsPending(
         PollResult.complete(
             Arrays.asList(
@@ -779,7 +838,7 @@ public void testGrowthTrackerOutputFullyBeforeCheckpointComplete() {
     assertFalse(tracker.hasPending());
     assertEquals(BoundedWindow.TIMESTAMP_MAX_VALUE, tracker.getWatermark());
 
-    GrowthTracker<String, Integer> residualTracker = newTracker(tracker.checkpoint());
+    GrowthTracker<String, String, Integer> residualTracker = newTracker(tracker.checkpoint());
 
     // Verify residual: should be empty, since output was final.
     residualTracker.checkDone();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> watchForNewFiles() can emit a file multiple times if it's growing
> -----------------------------------------------------------------
>
>                 Key: BEAM-3030
>                 URL: https://issues.apache.org/jira/browse/BEAM-3030
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>            Reporter: Eugene Kirpichov
>            Assignee: Eugene Kirpichov
>             Fix For: 2.3.0
>
>
> TextIO and AvroIO watchForNewFiles(), as well as FileIO.match().continuously(), use Watch transform under the hood, and watch the set of Metadata matching a filepattern.
> Two Metadata's with the same filename but different size are not considered equal, so if these transforms observe the same file multiple times with different sizes, they'll read the file multiple times.
> This is likely not yet a problem for production users, because these features require SDF, it's supported only in Dataflow runner, and users of the Dataflow runner are likely to use only files on GCS which doesn't support appends. However, this needs to be fixed still.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)