You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/22 21:04:30 UTC
[1/2] beam git commit: Deprecate GetAllowedTimestampSkew
Repository: beam
Updated Branches:
refs/heads/master c31b63340 -> 7c7bb8209
Deprecate GetAllowedTimestampSkew
AllowedTimestampSkew is unsafe, as it allows elements to be produced
before the watermark, which causes them to be late. BEAM-644 tracks
replacements for this method.
Handle infinite skew in SimpleDoFnRunner
Update tests to ensure that "unlimited skew" is respected
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9f970fa3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9f970fa3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9f970fa3
Branch: refs/heads/master
Commit: 9f970fa3f1b447c292445ee1c230f120ce6e97b1
Parents: c31b633
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 14 12:46:49 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Mar 22 14:03:26 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/SimpleDoFnRunner.java | 6 +-
.../beam/runners/core/SimpleDoFnRunnerTest.java | 146 +++++++++++++++++++
.../org/apache/beam/sdk/transforms/DoFn.java | 19 ++-
.../beam/sdk/transforms/WithTimestamps.java | 26 +++-
.../apache/beam/sdk/transforms/ParDoTest.java | 112 ++++++++++----
.../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 1 +
6 files changed, 272 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index f5a559c..dfa9645 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -576,8 +576,12 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
return windowedValue.getWindows();
}
+ @SuppressWarnings("deprecation") // Allowed Skew is deprecated for users, but must be respected
private void checkTimestamp(Instant timestamp) {
- if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
+ // The documentation of getAllowedTimestampSkew explicitly permits Long.MAX_VALUE to be used
+ // for infinite skew. Defend against underflow in that case for timestamps before the epoch
+ if (fn.getAllowedTimestampSkew().getMillis() != Long.MAX_VALUE
+ && timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) {
throw new IllegalArgumentException(
String.format(
"Cannot output with timestamp %s. Output timestamps must be no earlier than the "
http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 193722c..d8c5149 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -18,15 +18,20 @@
package org.apache.beam.runners.core;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.DoFn;
@@ -45,6 +50,7 @@ import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
import org.joda.time.Instant;
+import org.joda.time.format.PeriodFormat;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -234,6 +240,115 @@ public class SimpleDoFnRunnerTest {
TimeDomain.EVENT_TIME)));
}
+ /**
+ * Demonstrates that attempting to output an element before the timestamp of the current element
+ * with zero {@link DoFn#getAllowedTimestampSkew() allowed timestamp skew} throws.
+ */
+ @Test
+ public void testBackwardsInTimeNoSkew() {
+ SkewingDoFn fn = new SkewingDoFn(Duration.ZERO);
+ DoFnRunner<Duration, Duration> runner =
+ new SimpleDoFnRunner<>(
+ null,
+ fn,
+ NullSideInputReader.empty(),
+ new ListOutputManager(),
+ new TupleTag<Duration>(),
+ Collections.<TupleTag<?>>emptyList(),
+ mockStepContext,
+ null,
+ WindowingStrategy.of(new GlobalWindows()));
+
+ runner.startBundle();
+ // An element output at the current timestamp is fine.
+ runner.processElement(
+ WindowedValue.timestampedValueInGlobalWindow(Duration.ZERO, new Instant(0)));
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(isA(IllegalArgumentException.class));
+ thrown.expectMessage("must be no earlier");
+ thrown.expectMessage(
+ String.format("timestamp of the current input (%s)", new Instant(0).toString()));
+ thrown.expectMessage(
+ String.format(
+ "the allowed skew (%s)", PeriodFormat.getDefault().print(Duration.ZERO.toPeriod())));
+ // An element output before (current time - skew) is forbidden
+ runner.processElement(
+ WindowedValue.timestampedValueInGlobalWindow(Duration.millis(1L), new Instant(0)));
+ }
+
+ /**
+ * Demonstrates that attempting to output an element before the timestamp of the current element
+ * plus the value of {@link DoFn#getAllowedTimestampSkew()} throws, but between that value and
+ * the current timestamp succeeds.
+ */
+ @Test
+ public void testSkew() {
+ SkewingDoFn fn = new SkewingDoFn(Duration.standardMinutes(10L));
+ DoFnRunner<Duration, Duration> runner =
+ new SimpleDoFnRunner<>(
+ null,
+ fn,
+ NullSideInputReader.empty(),
+ new ListOutputManager(),
+ new TupleTag<Duration>(),
+ Collections.<TupleTag<?>>emptyList(),
+ mockStepContext,
+ null,
+ WindowingStrategy.of(new GlobalWindows()));
+
+ runner.startBundle();
+ // Outputting between "now" and "now - allowed skew" succeeds.
+ runner.processElement(
+ WindowedValue.timestampedValueInGlobalWindow(Duration.standardMinutes(5L), new Instant(0)));
+ thrown.expect(UserCodeException.class);
+ thrown.expectCause(isA(IllegalArgumentException.class));
+ thrown.expectMessage("must be no earlier");
+ thrown.expectMessage(
+ String.format("timestamp of the current input (%s)", new Instant(0).toString()));
+ thrown.expectMessage(
+ String.format(
+ "the allowed skew (%s)",
+ PeriodFormat.getDefault().print(Duration.standardMinutes(10L).toPeriod())));
+ // Outputting before "now - allowed skew" fails.
+ runner.processElement(
+ WindowedValue.timestampedValueInGlobalWindow(Duration.standardHours(1L), new Instant(0)));
+ }
+
+ /**
+ * Demonstrates that attempting to output an element with a timestamp before the current one
+ * always succeeds when {@link DoFn#getAllowedTimestampSkew()} is equal to
+ * {@link Long#MAX_VALUE} milliseconds.
+ */
+ @Test
+ public void testInfiniteSkew() {
+ SkewingDoFn fn = new SkewingDoFn(Duration.millis(Long.MAX_VALUE));
+ DoFnRunner<Duration, Duration> runner =
+ new SimpleDoFnRunner<>(
+ null,
+ fn,
+ NullSideInputReader.empty(),
+ new ListOutputManager(),
+ new TupleTag<Duration>(),
+ Collections.<TupleTag<?>>emptyList(),
+ mockStepContext,
+ null,
+ WindowingStrategy.of(new GlobalWindows()));
+
+ runner.startBundle();
+ runner.processElement(
+ WindowedValue.timestampedValueInGlobalWindow(Duration.millis(1L), new Instant(0)));
+ runner.processElement(
+ WindowedValue.timestampedValueInGlobalWindow(
+ Duration.millis(1L), BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1))));
+ runner.processElement(
+ WindowedValue.timestampedValueInGlobalWindow(
+ // This is the maximum amount a timestamp in beam can move (from the maximum timestamp
+ // to the minimum timestamp).
+ Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())
+ .minus(Duration.millis(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())),
+ BoundedWindow.TIMESTAMP_MAX_VALUE));
+ }
+
static class ThrowingDoFn extends DoFn<String, String> {
final Exception exceptionToThrow = new UnsupportedOperationException("Expected exception");
@@ -296,4 +411,35 @@ public class SimpleDoFnRunnerTest {
context.timeDomain()));
}
}
+
+
+ /**
+ * A {@link DoFn} that outputs elements with timestamp equal to the input timestamp minus the
+ * input element.
+ */
+ private static class SkewingDoFn extends DoFn<Duration, Duration> {
+ private final Duration allowedSkew;
+
+ private SkewingDoFn(Duration allowedSkew) {
+ this.allowedSkew = allowedSkew;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ context.outputWithTimestamp(context.element(), context.timestamp().minus(context.element()));
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return allowedSkew;
+ }
+ }
+
+ private static class ListOutputManager implements OutputManager {
+ private ListMultimap<TupleTag<?>, WindowedValue<?>> outputs = ArrayListMultimap.create();
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ outputs.put(tag, output);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 6f88738..6c5abbc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -43,6 +43,7 @@ import org.apache.beam.sdk.transforms.display.HasDisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.Timer;
import org.apache.beam.sdk.util.TimerSpec;
@@ -317,14 +318,20 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
}
/**
- * Returns the allowed timestamp skew duration, which is the maximum
- * duration that timestamps can be shifted backward in
- * {@link DoFn.Context#outputWithTimestamp}.
+ * Returns the allowed timestamp skew duration, which is the maximum duration that timestamps can
+ * be shifted backward in {@link DoFn.Context#outputWithTimestamp}.
+ *
+ * <p>The default value is {@code Duration.ZERO}, in which case timestamps can only be shifted
+ * forward to future. For infinite skew, return {@code Duration.millis(Long.MAX_VALUE)}.
+ *
+ * @deprecated This method permits a {@link DoFn} to emit elements behind the watermark. These
+ * elements are considered late, and if behind the
+ * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
+ * {@link PCollection} may be silently dropped. See
+ * https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
*
- * <p>The default value is {@code Duration.ZERO}, in which case
- * timestamps can only be shifted forward to future. For infinite
- * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
*/
+ @Deprecated
public Duration getAllowedTimestampSkew() {
return Duration.ZERO;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index 387707b..6f20226 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -45,10 +45,16 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
* each element is output with a timestamp obtained as the result of {@code fn.apply(v)}.
*
* <p>If the input {@link PCollection} elements have timestamps, the output timestamp for each
- * element must not be before the input element's timestamp minus the value of
- * {@link #getAllowedTimestampSkew()}. If an output timestamp is before this time, the transform
- * will throw an {@link IllegalArgumentException} when executed. Use
- * {@link #withAllowedTimestampSkew(Duration)} to update the allowed skew.
+ * element must not be before the input element's timestamp minus the value of {@link
+ * #getAllowedTimestampSkew()}. If an output timestamp is before this time, the transform will
+ * throw an {@link IllegalArgumentException} when executed. Use {@link
+ * #withAllowedTimestampSkew(Duration)} to update the allowed skew.
+ *
+ * <p>CAUTION: Use of {@link #withAllowedTimestampSkew(Duration)} permits elements to be emitted
+ * behind the watermark. These elements are considered late, and if behind the {@link
+ * Window#withAllowedLateness(Duration) allowed lateness} of a downstream {@link PCollection} may
+ * be silently dropped. See https://issues.apache.org/jira/browse/BEAM-644 for details on a
+ * replacement.
*
* <p>Each output element will be in the same windows as the input element. If a new window based
* on the new output timestamp is desired, apply a new instance of {@link Window#into(WindowFn)}.
@@ -82,7 +88,13 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
*
* <p>The default value is {@code Duration.ZERO}, allowing timestamps to only be shifted into the
* future. For infinite skew, use {@code new Duration(Long.MAX_VALUE)}.
+ * @deprecated This method permits a to elements to be emitted behind the watermark. These
+ * elements are considered late, and if behind the
+ * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
+ * {@link PCollection} may be silently dropped. See
+ * https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
*/
+ @Deprecated
public WithTimestamps<T> withAllowedTimestampSkew(Duration allowedTimestampSkew) {
return new WithTimestamps<>(this.fn, allowedTimestampSkew);
}
@@ -92,7 +104,13 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
* duration that timestamps can be shifted backwards from the timestamp of the input element.
*
* @see DoFn#getAllowedTimestampSkew()
+ * @deprecated This method permits a to elements to be emitted behind the watermark. These
+ * elements are considered late, and if behind the
+ * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream
+ * {@link PCollection} may be silently dropped. See
+ * https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement.
*/
+ @Deprecated
public Duration getAllowedTimestampSkew() {
return allowedTimestampSkew;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d5786f1..f7bf17a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -29,6 +29,7 @@ import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.collection.IsIterableContainingInAnyOrder.containsInAnyOrder;
import static org.hamcrest.collection.IsIterableContainingInOrder.contains;
@@ -250,15 +251,15 @@ public class ParDoTest implements Serializable {
}
}
- static class TestOutputTimestampDoFn extends DoFn<Integer, Integer> {
+ static class TestOutputTimestampDoFn<T extends Number> extends DoFn<T, T> {
@ProcessElement
public void processElement(ProcessContext c) {
- Integer value = c.element();
+ T value = c.element();
c.outputWithTimestamp(value, new Instant(value.longValue()));
}
}
- static class TestShiftTimestampDoFn extends DoFn<Integer, Integer> {
+ static class TestShiftTimestampDoFn<T extends Number> extends DoFn<T, T> {
private Duration allowedTimestampSkew;
private Duration durationToShift;
@@ -276,12 +277,12 @@ public class ParDoTest implements Serializable {
public void processElement(ProcessContext c) {
Instant timestamp = c.timestamp();
checkNotNull(timestamp);
- Integer value = c.element();
+ T value = c.element();
c.outputWithTimestamp(value, timestamp.plus(durationToShift));
}
}
- static class TestFormatTimestampDoFn extends DoFn<Integer, String> {
+ static class TestFormatTimestampDoFn<T extends Number> extends DoFn<T, String> {
@ProcessElement
public void processElement(ProcessContext c) {
checkNotNull(c.timestamp());
@@ -1238,9 +1239,9 @@ public class ParDoTest implements Serializable {
PCollection<String> output =
input
- .apply(ParDo.of(new TestOutputTimestampDoFn()))
- .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO)))
- .apply(ParDo.of(new TestFormatTimestampDoFn()));
+ .apply(ParDo.of(new TestOutputTimestampDoFn<Integer>()))
+ .apply(ParDo.of(new TestShiftTimestampDoFn<Integer>(Duration.ZERO, Duration.ZERO)))
+ .apply(ParDo.of(new TestFormatTimestampDoFn<Integer>()));
PAssert.that(output).containsInAnyOrder(
"processing: 3, timestamp: 3",
@@ -1270,8 +1271,8 @@ public class ParDoTest implements Serializable {
sideOutputTag, c.element(), new Instant(c.element().longValue()));
}
})).get(sideOutputTag)
- .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO, Duration.ZERO)))
- .apply(ParDo.of(new TestFormatTimestampDoFn()));
+ .apply(ParDo.of(new TestShiftTimestampDoFn<Integer>(Duration.ZERO, Duration.ZERO)))
+ .apply(ParDo.of(new TestFormatTimestampDoFn<Integer>()));
PAssert.that(output).containsInAnyOrder(
"processing: 3, timestamp: 3",
@@ -1282,7 +1283,7 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category(NeedsRunner.class)
+ @Category(RunnableOnService.class)
public void testParDoShiftTimestamp() {
PCollection<Integer> input =
@@ -1290,10 +1291,12 @@ public class ParDoTest implements Serializable {
PCollection<String> output =
input
- .apply(ParDo.of(new TestOutputTimestampDoFn()))
- .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.millis(1000),
- Duration.millis(-1000))))
- .apply(ParDo.of(new TestFormatTimestampDoFn()));
+ .apply(ParDo.of(new TestOutputTimestampDoFn<Integer>()))
+ .apply(
+ ParDo.of(
+ new TestShiftTimestampDoFn<Integer>(
+ Duration.millis(1000), Duration.millis(-1000))))
+ .apply(ParDo.of(new TestFormatTimestampDoFn<Integer>()));
PAssert.that(output).containsInAnyOrder(
"processing: 3, timestamp: -997",
@@ -1304,14 +1307,18 @@ public class ParDoTest implements Serializable {
}
@Test
- @Category(NeedsRunner.class)
+ @Category(RunnableOnService.class)
public void testParDoShiftTimestampInvalid() {
- pipeline.apply(Create.of(Arrays.asList(3, 42, 6)))
- .apply(ParDo.of(new TestOutputTimestampDoFn()))
- .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.millis(1000), // allowed skew = 1 second
- Duration.millis(-1001))))
- .apply(ParDo.of(new TestFormatTimestampDoFn()));
+ pipeline
+ .apply(Create.of(Arrays.asList(3, 42, 6)))
+ .apply(ParDo.of(new TestOutputTimestampDoFn<Integer>()))
+ .apply(
+ ParDo.of(
+ new TestShiftTimestampDoFn<Integer>(
+ Duration.millis(1000), // allowed skew = 1 second
+ Duration.millis(-1001))))
+ .apply(ParDo.of(new TestFormatTimestampDoFn<Integer>()));
thrown.expect(RuntimeException.class);
thrown.expectMessage("Cannot output with timestamp");
@@ -1324,12 +1331,11 @@ public class ParDoTest implements Serializable {
@Test
@Category(NeedsRunner.class)
public void testParDoShiftTimestampInvalidZeroAllowed() {
-
- pipeline.apply(Create.of(Arrays.asList(3, 42, 6)))
- .apply(ParDo.of(new TestOutputTimestampDoFn()))
- .apply(ParDo.of(new TestShiftTimestampDoFn(Duration.ZERO,
- Duration.millis(-1001))))
- .apply(ParDo.of(new TestFormatTimestampDoFn()));
+ pipeline
+ .apply(Create.of(Arrays.asList(3, 42, 6)))
+ .apply(ParDo.of(new TestOutputTimestampDoFn<Integer>()))
+ .apply(ParDo.of(new TestShiftTimestampDoFn<Integer>(Duration.ZERO, Duration.millis(-1001))))
+ .apply(ParDo.of(new TestFormatTimestampDoFn<Integer>()));
thrown.expect(RuntimeException.class);
thrown.expectMessage("Cannot output with timestamp");
@@ -1339,6 +1345,58 @@ public class ParDoTest implements Serializable {
pipeline.run();
}
+ @Test
+ @Category(RunnableOnService.class)
+ public void testParDoShiftTimestampUnlimited() {
+ PCollection<Long> outputs =
+ pipeline
+ .apply(
+ Create.of(
+ Arrays.asList(
+ 0L,
+ BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis(),
+ GlobalWindow.INSTANCE.maxTimestamp().getMillis())))
+ .apply("AssignTimestampToValue", ParDo.of(new TestOutputTimestampDoFn<Long>()))
+ .apply("ReassignToMinimumTimestamp",
+ ParDo.of(
+ new DoFn<Long, Long>() {
+ @ProcessElement
+ public void reassignTimestamps(ProcessContext context) {
+ // Shift the latest element as far backwards in time as the model permits
+ context.outputWithTimestamp(
+ context.element(), BoundedWindow.TIMESTAMP_MIN_VALUE);
+ }
+
+ @Override
+ public Duration getAllowedTimestampSkew() {
+ return Duration.millis(Long.MAX_VALUE);
+ }
+ }));
+
+ PAssert.that(outputs)
+ .satisfies(
+ new SerializableFunction<Iterable<Long>, Void>() {
+ @Override
+ public Void apply(Iterable<Long> input) {
+ // This element is not shifted backwards in time. It must be present in the output.
+ assertThat(input, hasItem(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()));
+ for (Long elem : input) {
+ // Sanity check the outputs. 0L and the end of the global window are shifted
+ // backwards in time and theoretically could be dropped.
+ assertThat(
+ elem,
+ anyOf(
+ equalTo(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis()),
+ equalTo(GlobalWindow.INSTANCE.maxTimestamp().getMillis()),
+ equalTo(0L)));
+ }
+ return null;
+ }
+ });
+
+ pipeline.run();
+ }
+
private static class Checker implements SerializableFunction<Iterable<String>, Void> {
@Override
public Void apply(Iterable<String> input) {
http://git-wip-us.apache.org/repos/asf/beam/blob/9f970fa3/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
index b356dad..d924c14 100644
--- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
+++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java
@@ -174,6 +174,7 @@ public class MongoDbGridFSIO {
.setParser(TEXT_PARSER)
.setCoder(StringUtf8Coder.of())
.setConnectionConfiguration(ConnectionConfiguration.create())
+ .setSkew(Duration.ZERO)
.build();
}
[2/2] beam git commit: This closes #2264
Posted by tg...@apache.org.
This closes #2264
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7c7bb820
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7c7bb820
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7c7bb820
Branch: refs/heads/master
Commit: 7c7bb82096df53603f2885db4efd09560e71a60f
Parents: c31b633 9f970fa
Author: Thomas Groh <tg...@google.com>
Authored: Wed Mar 22 14:04:13 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Mar 22 14:04:13 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/SimpleDoFnRunner.java | 6 +-
.../beam/runners/core/SimpleDoFnRunnerTest.java | 146 +++++++++++++++++++
.../org/apache/beam/sdk/transforms/DoFn.java | 19 ++-
.../beam/sdk/transforms/WithTimestamps.java | 26 +++-
.../apache/beam/sdk/transforms/ParDoTest.java | 112 ++++++++++----
.../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 1 +
6 files changed, 272 insertions(+), 38 deletions(-)
----------------------------------------------------------------------