You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/22 21:04:30 UTC

[1/2] beam git commit: Deprecate GetAllowedTimestampSkew

Repository: beam
Updated Branches:
  refs/heads/master c31b63340 -> 7c7bb8209


Deprecate GetAllowedTimestampSkew

AllowedTimestampSkew is unsafe, as it allows elements to be produced
before the watermark, which causes them to be late. BEAM-644 tracks
replacements for this method.

Handle infinite skew in SimpleDoFnRunner

Update tests to ensure that "unlimited skew" is respected


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f970fa3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f970fa3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f970fa3

Branch: refs/heads/master
Commit: 9f970fa3f1b447c292445ee1c230f120ce6e97b1
Parents: c31b633
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 14 12:46:49 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Mar 22 14:03:26 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     |   6 +-
 .../beam/runners/core/SimpleDoFnRunnerTest.java | 146 +++++++++++++++++++
 .../org/apache/beam/sdk/transforms/DoFn.java    |  19 ++-
 .../beam/sdk/transforms/WithTimestamps.java     |  26 +++-
 .../apache/beam/sdk/transforms/ParDoTest.java   | 112 ++++++++++----
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    |   1 +
 6 files changed, 272 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index f5a559c..dfa9645 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -576,8 +576,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       return windowedValue.getWindows();
     }
 
+    @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected
     private void checkTimestamp(Instant timestamp) {
-      if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
+      // The documentation of getAllowedTimestampSkew explicitly permits Long.MAX_VALUE to be used
+      // for infinite skew. Defend against underflow in that case for timestamps before the epoch
+      if (fn.getAllowedTimestampSkew().getMillis() != Long.MAX_VALUE
+          && timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
         throw new IllegalArgumentException(
             String.format(
                 "Cannot output with timestamp %s. Output timestamps must be no earlier than the "

http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 193722c..d8c5149 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -18,15 +18,20 @@
 package org.apache.beam.runners.core;
 
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -45,6 +50,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -234,6 +240,115 @@ public class SimpleDoFnRunnerTest {
                 TimeDomain.EVENT_TIME)));
   }
 
+  /**
+   * Demonstrates that attempting to output an element before the timestamp of the current element
+   * with zero {@link DoFn#getAllowedTimestampSkew() allowed timestamp skew} throws.
+   */
+  @Test
+  public void testBackwardsInTimeNoSkew() {
+    SkewingDoFn fn = new SkewingDoFn(Duration.ZERO);
+    DoFnRunner<Duration, Duration> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            NullSideInputReader.empty(),
+            new ListOutputManager(),
+            new TupleTag<Duration>(),
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(new GlobalWindows()));
+
+    runner.startBundle();
+    // An element output at the current timestamp is fine.
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(Duration.ZERO, new Instant(0)));
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalArgumentException.class));
+    thrown.expectMessage("must be no earlier");
+    thrown.expectMessage(
+        String.format("timestamp of the current input (%s)", new Instant(0).toString()));
+    thrown.expectMessage(
+        String.format(
+            "the allowed skew (%s)", PeriodFormat.getDefault().print(Duration.ZERO.toPeriod())));
+    // An element output before (current time - skew) is forbidden
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(Duration.millis(1L), new Instant(0)));
+  }
+
+  /**
+   * Demonstrates that attempting to output an element before the timestamp of the current element
+   * plus the value of {@link DoFn#getAllowedTimestampSkew()} throws, but between that value and
+   * the current timestamp succeeds.
+   */
+  @Test
+  public void testSkew() {
+    SkewingDoFn fn = new SkewingDoFn(Duration.standardMinutes(10L));
+    DoFnRunner<Duration, Duration> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            NullSideInputReader.empty(),
+            new ListOutputManager(),
+            new TupleTag<Duration>(),
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(new GlobalWindows()));
+
+    runner.startBundle();
+    // Outputting between "now" and "now - allowed skew" succeeds.
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(Duration.standardMinutes(5L), new Instant(0)));
+    thrown.expect(UserCodeException.class);
+    thrown.expectCause(isA(IllegalArgumentException.class));
+    thrown.expectMessage("must be no earlier");
+    thrown.expectMessage(
+        String.format("timestamp of the current input (%s)", new Instant(0).toString()));
+    thrown.expectMessage(
+        String.format(
+            "the allowed skew (%s)",
+            PeriodFormat.getDefault().print(Duration.standardMinutes(10L).toPeriod())));
+    // Outputting before "now - allowed skew" fails.
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(Duration.standardHours(1L), new Instant(0)));
+  }
+
+  /**
+   * Demonstrates that attempting to output an element with a timestamp before the current one
+   * always succeeds when {@link DoFn#getAllowedTimestampSkew()} is equal to
+   * {@link Long#MAX_VALUE} milliseconds.
+   */
+  @Test
+  public void testInfiniteSkew() {
+    SkewingDoFn fn = new SkewingDoFn(Duration.millis(Long.MAX_VALUE));
+    DoFnRunner<Duration, Duration> runner =
+        new SimpleDoFnRunner<>(
+            null,
+            fn,
+            NullSideInputReader.empty(),
+            new ListOutputManager(),
+            new TupleTag<Duration>(),
+            Collections.<TupleTag<?>>emptyList(),
+            mockStepContext,
+            null,
+            WindowingStrategy.of(new GlobalWindows()));
+
+    runner.startBundle();
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(Duration.millis(1L), new Instant(0)));
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            Duration.millis(1L), BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1))));
+    runner.processElement(
+        WindowedValue.timestampedValueInGlobalWindow(
+            // This is the maximum amount a timestamp in beam can move (from the maximum timestamp
+            // to the minimum timestamp).
+            Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())
+                .minus(Duration.millis(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())),
+            BoundedWindow.TIMESTAMP_MAX_VALUE));
+  }
+
   static class ThrowingDoFn extends DoFn<String, String> {
     final Exception exceptionToThrow = new UnsupportedOperationException("Expected exception");
 
@@ -296,4 +411,35 @@ public class SimpleDoFnRunnerTest {
               context.timeDomain()));
     }
   }
+
+
+  /**
+   * A {@link DoFn} that outputs elements with timestamp equal to the input timestamp minus the
+   * input element.
+   */
+  private static class SkewingDoFn extends DoFn<Duration, Duration> {
+    private final Duration allowedSkew;
+
+    private SkewingDoFn(Duration allowedSkew) {
+      this.allowedSkew = allowedSkew;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      context.outputWithTimestamp(context.element(), context.timestamp().minus(context.element()));
+    }
+
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      return allowedSkew;
+    }
+  }
+
+  private static class ListOutputManager implements OutputManager {
+    private ListMultimap<TupleTag<?>, WindowedValue<?>> outputs = ArrayListMultimap.create();
+    @Override
+    public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+      outputs.put(tag, output);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 6f88738..6c5abbc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.Timer;
 import org.apache.beam.sdk.util.TimerSpec;
@@ -317,14 +318,20 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
   }
 
   /**
-   * Returns the allowed timestamp skew duration, which is the maximum
-   * duration that timestamps can be shifted backward in
-   * {@link DoFn.Context#outputWithTimestamp}.
+   * Returns the allowed timestamp skew duration, which is the maximum duration that timestamps can
+   * be shifted backward in {@link DoFn.Context#outputWithTimestamp}.
+   *
+   * <p>The default value is {@code Duration.ZERO}, in which case timestamps can only be shifted
+   * forward to future. For infinite skew, return {@code Duration.millis(Long.MAX_VALUE)}.
+   *
+   * @deprecated This method permits a {@link DoFn} to emit elements behind the watermark. These
+   *     elements are considered late, and if behind the
+   *     {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
+   *     {@link PCollection} may be silently dropped. See
+   *     https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
    *
-   * <p>The default value is {@code Duration.ZERO}, in which case
-   * timestamps can only be shifted forward to future.  For infinite
-   * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
    */
+  @Deprecated
   public Duration getAllowedTimestampSkew() {
     return Duration.ZERO;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index 387707b..6f20226 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -45,10 +45,16 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
    * each element is output with a timestamp obtained as the result of {@code fn.apply(v)}.
    *
    * <p>If the input {@link PCollection} elements have timestamps, the output timestamp for each
-   * element must not be before the input element's timestamp minus the value of
-   * {@link #getAllowedTimestampSkew()}. If an output timestamp is before this time, the transform
-   * will throw an {@link IllegalArgumentException} when executed. Use
-   * {@link #withAllowedTimestampSkew(Duration)} to update the allowed skew.
+   * element must not be before the input element's timestamp minus the value of {@link
+   * #getAllowedTimestampSkew()}. If an output timestamp is before this time, the transform will
+   * throw an {@link IllegalArgumentException} when executed. Use {@link
+   * #withAllowedTimestampSkew(Duration)} to update the allowed skew.
+   *
+   * <p>CAUTION: Use of {@link #withAllowedTimestampSkew(Duration)} permits elements to be emitted
+   * behind the watermark. These elements are considered late, and if behind the {@link
+   * Window#withAllowedLateness(Duration) allowed lateness} of a downstream {@link PCollection} may
+   * be silently dropped. See https://issues.apache.org/jira/browse/BEAM-644 for details on a
+   * replacement.
    *
    * <p>Each output element will be in the same windows as the input element. If a new window based
    * on the new output timestamp is desired, apply a new instance of {@link Window#into(WindowFn)}.
@@ -82,7 +88,13 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
    *
    * <p>The default value is {@code Duration.ZERO}, allowing timestamps to only be shifted into the
    * future. For infinite skew, use {@code new Duration(Long.MAX_VALUE)}.
+   * @deprecated This method permits a to elements to be emitted behind the watermark. These
+   *     elements are considered late, and if behind the
+   *     {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
+   *     {@link PCollection} may be silently dropped. See
+   *     https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
    */
+  @Deprecated
   public WithTimestamps<T> withAllowedTimestampSkew(Duration allowedTimestampSkew) {
     return new WithTimestamps<>(this.fn, allowedTimestampSkew);
   }
@@ -92,7 +104,13 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
    * duration that timestamps can be shifted backwards from the timestamp of the input element.
    *
    * @see DoFn#getAllowedTimestampSkew()
+   * @deprecated This method permits a to elements to be emitted behind the watermark. These
+   *     elements are considered late, and if behind the
+   *     {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
+   *     {@link PCollection} may be silently dropped. See
+   *     https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
    */
+  @Deprecated
   public Duration getAllowedTimestampSkew() {
     return allowedTimestampSkew;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d5786f1..f7bf17a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
 import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
@@ -250,15 +251,15 @@ public class ParDoTest implements Serializable {
     }
   }
 
-  static class TestOutputTimestampDoFn extends DoFn<Integer, Integer> {
+  static class TestOutputTimestampDoFn<T extends Number> extends DoFn<T, T> {
     @ProcessElement
     public void processElement(ProcessContext c) {
-      Integer value = c.element();
+      T value = c.element();
       c.outputWithTimestamp(value, new Instant(value.longValue()));
     }
   }
 
-  static class TestShiftTimestampDoFn extends DoFn<Integer, Integer> {
+  static class TestShiftTimestampDoFn<T extends Number> extends DoFn<T, T> {
     private Duration allowedTimestampSkew;
     private Duration durationToShift;
 
@@ -276,12 +277,12 @@ public class ParDoTest implements Serializable {
     public void processElement(ProcessContext c) {
       Instant timestamp = c.timestamp();
       checkNotNull(timestamp);
-      Integer value = c.element();
+      T value = c.element();
       c.outputWithTimestamp(value, timestamp.plus(durationToShift));
     }
   }
 
-  static class TestFormatTimestampDoFn extends DoFn<Integer, String> {
+  static class TestFormatTimestampDoFn<T extends Number> extends DoFn<T, String> {
     @ProcessElement
     public void processElement(ProcessContext c) {
       checkNotNull(c.timestamp());
@@ -1238,9 +1239,9 @@ public class ParDoTest implements Serializable {
 
     PCollection<String> output =
         input
-        .apply(ParDo.of(new TestOutputTimestampDoFn()))
-        .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO)))
-        .apply(ParDo.of(new TestFormatTimestampDoFn()));
+        .apply(ParDo.of(new TestOutputTimestampDoFn<Integer>()))
+        .apply(ParDo.of(new TestShiftTimestampDoFn<Integer>(Duration.ZERO, Duration.ZERO)))
+        .apply(ParDo.of(new TestFormatTimestampDoFn<Integer>()));
 
     PAssert.that(output).containsInAnyOrder(
                    "processing: 3, timestamp: 3",
@@ -1270,8 +1271,8 @@ public class ParDoTest implements Serializable {
                     sideOutputTag, c.element(), new Instant(c.element().longValue()));
               }
             })).get(sideOutputTag)
-        .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO)))
-        .apply(ParDo.of(new TestFormatTimestampDoFn()));
+        .apply(ParDo.of(new TestShiftTimestampDoFn<Integer>(Duration.ZERO, Duration.ZERO)))
+        .apply(ParDo.of(new TestFormatTimestampDoFn<Integer>()));
 
     PAssert.that(output).containsInAnyOrder(
                    "processing: 3, timestamp: 3",
@@ -1282,7 +1283,7 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
+  @Category(RunnableOnService.class)
   public void testParDoShiftTimestamp() {
 
     PCollection<Integer> input =
@@ -1290,10 +1291,12 @@ public class ParDoTest implements Serializable {
 
     PCollection<String> output =
         input
-        .apply(ParDo.of(new TestOutputTimestampDoFn()))
-        .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.millis(1000),
-                                                   Duration.millis(-1000))))
-        .apply(ParDo.of(new TestFormatTimestampDoFn()));
+            .apply(ParDo.of(new TestOutputTimestampDoFn<Integer>()))
+            .apply(
+                ParDo.of(
+                    new TestShiftTimestampDoFn<Integer>(
+                        Duration.millis(1000), Duration.millis(-1000))))
+            .apply(ParDo.of(new TestFormatTimestampDoFn<Integer>()));
 
     PAssert.that(output).containsInAnyOrder(
                    "processing: 3, timestamp: -997",
@@ -1304,14 +1307,18 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
-  @Category(NeedsRunner.class)
+  @Category(RunnableOnService.class)
   public void testParDoShiftTimestampInvalid() {
 
-    pipeline.apply(Create.of(Arrays.asList(3, 42, 6)))
-        .apply(ParDo.of(new TestOutputTimestampDoFn()))
-        .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.millis(1000), // allowed skew = 1 second
-                                                   Duration.millis(-1001))))
-        .apply(ParDo.of(new TestFormatTimestampDoFn()));
+    pipeline
+        .apply(Create.of(Arrays.asList(3, 42, 6)))
+        .apply(ParDo.of(new TestOutputTimestampDoFn<Integer>()))
+        .apply(
+            ParDo.of(
+                new TestShiftTimestampDoFn<Integer>(
+                    Duration.millis(1000), // allowed skew = 1 second
+                    Duration.millis(-1001))))
+        .apply(ParDo.of(new TestFormatTimestampDoFn<Integer>()));
 
     thrown.expect(RuntimeException.class);
     thrown.expectMessage("Cannot output with timestamp");
@@ -1324,12 +1331,11 @@ public class ParDoTest implements Serializable {
   @Test
   @Category(NeedsRunner.class)
   public void testParDoShiftTimestampInvalidZeroAllowed() {
-
-    pipeline.apply(Create.of(Arrays.asList(3, 42, 6)))
-        .apply(ParDo.of(new TestOutputTimestampDoFn()))
-        .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO,
-                                                   Duration.millis(-1001))))
-        .apply(ParDo.of(new TestFormatTimestampDoFn()));
+    pipeline
+        .apply(Create.of(Arrays.asList(3, 42, 6)))
+        .apply(ParDo.of(new TestOutputTimestampDoFn<Integer>()))
+        .apply(ParDo.of(new TestShiftTimestampDoFn<Integer>(Duration.ZERO, Duration.millis(-1001))))
+        .apply(ParDo.of(new TestFormatTimestampDoFn<Integer>()));
 
     thrown.expect(RuntimeException.class);
     thrown.expectMessage("Cannot output with timestamp");
@@ -1339,6 +1345,58 @@ public class ParDoTest implements Serializable {
     pipeline.run();
   }
 
+  @Test
+  @Category(RunnableOnService.class)
+  public void testParDoShiftTimestampUnlimited() {
+    PCollection<Long> outputs =
+        pipeline
+            .apply(
+                Create.of(
+                    Arrays.asList(
+                        0L,
+                        BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(),
+                        GlobalWindow.INSTANCE.maxTimestamp().getMillis())))
+            .apply("AssignTimestampToValue", ParDo.of(new TestOutputTimestampDoFn<Long>()))
+            .apply("ReassignToMinimumTimestamp",
+                ParDo.of(
+                    new DoFn<Long, Long>() {
+                      @ProcessElement
+                      public void reassignTimestamps(ProcessContext context) {
+                        // Shift the latest element as far backwards in time as the model permits
+                        context.outputWithTimestamp(
+                            context.element(), BoundedWindow.TIMESTAMP_MIN_VALUE);
+                      }
+
+                      @Override
+                      public Duration getAllowedTimestampSkew() {
+                        return Duration.millis(Long.MAX_VALUE);
+                      }
+                    }));
+
+    PAssert.that(outputs)
+        .satisfies(
+            new SerializableFunction<Iterable<Long>, Void>() {
+              @Override
+              public Void apply(Iterable<Long> input) {
+                // This element is not shifted backwards in time. It must be present in the output.
+                assertThat(input, hasItem(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()));
+                for (Long elem : input) {
+                  // Sanity check the outputs. 0L and the end of the global window are shifted
+                  // backwards in time and theoretically could be dropped.
+                  assertThat(
+                      elem,
+                      anyOf(
+                          equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()),
+                          equalTo(GlobalWindow.INSTANCE.maxTimestamp().getMillis()),
+                          equalTo(0L)));
+                }
+                return null;
+              }
+            });
+
+    pipeline.run();
+  }
+
   private static class Checker implements SerializableFunction<Iterable<String>, Void> {
     @Override
     public Void apply(Iterable<String> input) {

http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index b356dad..d924c14 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -174,6 +174,7 @@ public class MongoDbGridFSIO {
         .setParser(TEXT_PARSER)
         .setCoder(StringUtf8Coder.of())
         .setConnectionConfiguration(ConnectionConfiguration.create())
+        .setSkew(Duration.ZERO)
         .build();
   }
 


[2/2] beam git commit: This closes #2264

Posted by tg...@apache.org.
This closes #2264


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c7bb820
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c7bb820
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c7bb820

Branch: refs/heads/master
Commit: 7c7bb82096df53603f2885db4efd09560e71a60f
Parents: c31b633 9f970fa
Author: Thomas Groh <tg...@google.com>
Authored: Wed Mar 22 14:04:13 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Mar 22 14:04:13 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SimpleDoFnRunner.java     |   6 +-
 .../beam/runners/core/SimpleDoFnRunnerTest.java | 146 +++++++++++++++++++
 .../org/apache/beam/sdk/transforms/DoFn.java    |  19 ++-
 .../beam/sdk/transforms/WithTimestamps.java     |  26 +++-
 .../apache/beam/sdk/transforms/ParDoTest.java   | 112 ++++++++++----
 .../beam/sdk/io/mongodb/MongoDbGridFSIO.java    |   1 +
 6 files changed, 272 insertions(+), 38 deletions(-)
----------------------------------------------------------------------