You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/16 23:40:01 UTC
[1/2] beam git commit: This closes #2246
Repository: beam
Updated Branches:
refs/heads/master 960f3e660 -> 25b52c5ac
This closes #2246
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/25b52c5a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/25b52c5a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/25b52c5a
Branch: refs/heads/master
Commit: 25b52c5acc3dddf8ee979b1b7a8c201b68c3f268
Parents: 960f3e6 58cc359
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 16 16:39:47 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Mar 16 16:39:47 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/util/ReifyTimestamps.java | 76 +++++++++++++
.../org/apache/beam/sdk/util/Reshuffle.java | 19 ++--
.../beam/sdk/util/ReifyTimestampsTest.java | 109 +++++++++++++++++++
.../org/apache/beam/sdk/util/ReshuffleTest.java | 70 +++++++++++-
4 files changed, 265 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Do not Shift Timestamps in Reshuffle
Posted by tg...@apache.org.
Do not Shift Timestamps in Reshuffle
Explicitly reify input timestamps and restore them after the output of
Reshuffle.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/58cc3597
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/58cc3597
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/58cc3597
Branch: refs/heads/master
Commit: 58cc35970665af99a9ba95d3f28e0974149d8f72
Parents: 960f3e6
Author: Thomas Groh <tg...@google.com>
Authored: Tue Mar 14 14:05:44 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Mar 16 16:39:47 2017 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/util/ReifyTimestamps.java | 76 +++++++++++++
.../org/apache/beam/sdk/util/Reshuffle.java | 19 ++--
.../beam/sdk/util/ReifyTimestampsTest.java | 109 +++++++++++++++++++
.../org/apache/beam/sdk/util/ReshuffleTest.java | 70 +++++++++++-
4 files changed, 265 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/58cc3597/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestamps.java
new file mode 100644
index 0000000..3b291af
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestamps.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * {@link PTransform PTransforms} for reifying the timestamp of values and reemitting the original
+ * value with the original timestamp.
+ */
+public class ReifyTimestamps {
+ private ReifyTimestamps() {}
+
+ /**
+ * Create a {@link PTransform} that will output all input {@link KV KVs} with the timestamp inside
+ * the value.
+ */
+ public static <K, V>
+ PTransform<PCollection<? extends KV<K, V>>, PCollection<KV<K, TimestampedValue<V>>>>
+ inValues() {
+ return ParDo.of(new ReifyValueTimestampDoFn<K, V>());
+ }
+
+ /**
+ * Create a {@link PTransform} that consumes {@link KV KVs} with a {@link TimestampedValue} as the
+ * value, and outputs the {@link KV} of the input key and value at the timestamp specified by the
+ * {@link TimestampedValue}.
+ */
+ public static <K, V>
+ PTransform<PCollection<? extends KV<K, TimestampedValue<V>>>, PCollection<KV<K, V>>>
+ extractFromValues() {
+ return ParDo.of(new ExtractTimestampedValueDoFn<K, V>());
+ }
+
+ private static class ReifyValueTimestampDoFn<K, V>
+ extends DoFn<KV<K, V>, KV<K, TimestampedValue<V>>> {
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ context.output(
+ KV.of(
+ context.element().getKey(),
+ TimestampedValue.of(context.element().getValue(), context.timestamp())));
+ }
+ }
+
+ private static class ExtractTimestampedValueDoFn<K, V>
+ extends DoFn<KV<K, TimestampedValue<V>>, KV<K, V>> {
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ KV<K, TimestampedValue<V>> kv = context.element();
+ context.outputWithTimestamp(
+ KV.of(kv.getKey(), kv.getValue().getValue()), kv.getValue().getTimestamp());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/58cc3597/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 4d86c74..e80bc17 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -22,9 +22,11 @@ import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
/**
@@ -55,28 +57,31 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti
// If the input has already had its windows merged, then the GBK that performed the merge
// will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
// here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
+ // The OutputTimeFn is set to ensure the GroupByKey does not shift elements forwards in time.
+ // Because this outputs as fast as possible, this should not hold the watermark.
Window.Bound<KV<K, V>> rewindow =
- Window.<KV<K, V>>into(
- new IdentityWindowFn<>(
- originalStrategy.getWindowFn().windowCoder()))
+ Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
.triggering(new ReshuffleTrigger<>())
.discardingFiredPanes()
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
return input.apply(rewindow)
- .apply(GroupByKey.<K, V>create())
+ .apply("ReifyOriginalTimestamps", ReifyTimestamps.<K, V>inValues())
+ .apply(GroupByKey.<K, TimestampedValue<V>>create())
// Set the windowing strategy directly, so that it doesn't get counted as the user having
// set allowed lateness.
.setWindowingStrategyInternal(originalStrategy)
.apply("ExpandIterable", ParDo.of(
- new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
+ new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, TimestampedValue<V>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
K key = c.element().getKey();
- for (V value : c.element().getValue()) {
+ for (TimestampedValue<V> value : c.element().getValue()) {
c.output(KV.of(key, value));
}
}
- }));
+ }))
+ .apply("RestoreOriginalTimestamps", ReifyTimestamps.<K, V>extractFromValues());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/58cc3597/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
new file mode 100644
index 0000000..b78de8e
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ReifyTimestamps}.
+ */
+@RunWith(JUnit4.class)
+public class ReifyTimestampsTest implements Serializable {
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void inValuesSucceeds() {
+ PCollection<KV<String, Integer>> timestamped =
+ pipeline
+ .apply(Create.of(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3)))
+ .apply(
+ WithTimestamps.of(
+ new SerializableFunction<KV<String, Integer>, Instant>() {
+ @Override
+ public Instant apply(KV<String, Integer> input) {
+ return new Instant(input.getValue().longValue());
+ }
+ }));
+
+ PCollection<KV<String, TimestampedValue<Integer>>> reified =
+ timestamped.apply(ReifyTimestamps.<String, Integer>inValues());
+
+ PAssert.that(reified)
+ .containsInAnyOrder(
+ KV.of("foo", TimestampedValue.of(0, new Instant(0))),
+ KV.of("foo", TimestampedValue.of(1, new Instant(1))),
+ KV.of("bar", TimestampedValue.of(2, new Instant(2))),
+ KV.of("baz", TimestampedValue.of(3, new Instant(3))));
+
+ pipeline.run();
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void extractFromValuesSucceeds() {
+ PCollection<KV<String, TimestampedValue<Integer>>> preified =
+ pipeline.apply(
+ Create.of(
+ KV.of("foo", TimestampedValue.of(0, new Instant((0)))),
+ KV.of("foo", TimestampedValue.of(1, new Instant(1))),
+ KV.of("bar", TimestampedValue.of(2, new Instant(2))),
+ KV.of("baz", TimestampedValue.of(3, new Instant(3)))));
+
+ PCollection<KV<String, Integer>> timestamped =
+ preified.apply(ReifyTimestamps.<String, Integer>extractFromValues());
+
+ PAssert.that(timestamped)
+ .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz", 3));
+
+ timestamped.apply(
+ "AssertElementTimestamps",
+ ParDo.of(
+ new DoFn<KV<String, Integer>, Void>() {
+ @ProcessElement
+ public void verifyTimestampsEqualValue(ProcessContext context) {
+ assertThat(
+ new Instant(context.element().getValue().longValue()),
+ equalTo(context.timestamp()));
+ }
+ }));
+
+ pipeline.run();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/58cc3597/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
index d47cddc..81a6d82 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
@@ -17,9 +17,12 @@
*/
package org.apache.beam.sdk.util;
+import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
import java.util.List;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -29,12 +32,19 @@ import org.apache.beam.sdk.testing.RunnableOnService;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -45,7 +55,7 @@ import org.junit.runners.JUnit4;
* Tests for {@link Reshuffle}.
*/
@RunWith(JUnit4.class)
-public class ReshuffleTest {
+public class ReshuffleTest implements Serializable {
private static final List<KV<String, Integer>> ARBITRARY_KVS = ImmutableList.of(
KV.of("k1", 3),
@@ -66,7 +76,7 @@ public class ReshuffleTest {
KV.of("k2", (Iterable<Integer>) ImmutableList.of(4)));
@Rule
- public final TestPipeline pipeline = TestPipeline.create();
+ public final transient TestPipeline pipeline = TestPipeline.create();
@Test
@Category(RunnableOnService.class)
@@ -88,6 +98,62 @@ public class ReshuffleTest {
pipeline.run();
}
+ /**
+ * Tests that timestamps are preserved after applying a {@link Reshuffle} with the default
+ * {@link WindowingStrategy}.
+ */
+ @Test
+ @Category(RunnableOnService.class)
+ public void testReshufflePreservesTimestamps() {
+ PCollection<KV<String, TimestampedValue<String>>> input =
+ pipeline
+ .apply(
+ Create.timestamped(
+ TimestampedValue.of("foo", BoundedWindow.TIMESTAMP_MIN_VALUE),
+ TimestampedValue.of("foo", new Instant(0)),
+ TimestampedValue.of("bar", new Instant(33)),
+ TimestampedValue.of("bar", GlobalWindow.INSTANCE.maxTimestamp()))
+ .withCoder(StringUtf8Coder.of()))
+ .apply(
+ WithKeys.of(
+ new SerializableFunction<String, String>() {
+ @Override
+ public String apply(String input) {
+ return input;
+ }
+ }))
+ .apply("ReifyOriginalTimestamps", ReifyTimestamps.<String, String>inValues());
+
+ // The outer TimestampedValue is the reified timestamp post-reshuffle. The inner
+ // TimestampedValue is the pre-reshuffle timestamp.
+ PCollection<TimestampedValue<TimestampedValue<String>>> output =
+ input
+ .apply(Reshuffle.<String, TimestampedValue<String>>of())
+ .apply(
+ "ReifyReshuffledTimestamps",
+ ReifyTimestamps.<String, TimestampedValue<String>>inValues())
+ .apply(Values.<TimestampedValue<TimestampedValue<String>>>create());
+
+ PAssert.that(output)
+ .satisfies(
+ new SerializableFunction<Iterable<TimestampedValue<TimestampedValue<String>>>, Void>() {
+ @Override
+ public Void apply(Iterable<TimestampedValue<TimestampedValue<String>>> input) {
+ for (TimestampedValue<TimestampedValue<String>> elem : input) {
+ Instant originalTimestamp = elem.getValue().getTimestamp();
+ Instant afterReshuffleTimestamp = elem.getTimestamp();
+ assertThat(
+ "Reshuffle must preserve element timestamps",
+ afterReshuffleTimestamp,
+ equalTo(originalTimestamp));
+ }
+ return null;
+ }
+ });
+
+ pipeline.run();
+ }
+
@Test
@Category(RunnableOnService.class)
public void testReshuffleAfterSessionsAndGroupByKey() {