You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/08/07 23:53:52 UTC
[27/50] [abbrv] beam git commit: Respect WindowFn#getOutputTime in
gearpump-runner
Respect WindowFn#getOutputTime in gearpump-runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/98854d4d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/98854d4d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/98854d4d
Branch: refs/heads/master
Commit: 98854d4d01ca526ea4a44dc077d2cfb4cddf9914
Parents: 3c7e3e6
Author: manuzhang <ow...@gmail.com>
Authored: Fri May 19 09:19:42 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Jun 5 19:16:53 2017 +0800
----------------------------------------------------------------------
.../gearpump/translators/GroupByKeyTranslator.java | 12 ++++++++----
.../gearpump/translators/GroupByKeyTranslatorTest.java | 8 ++++----
2 files changed, 12 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/98854d4d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
index 521f665..7d944a4 100644
--- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
+++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java
@@ -74,7 +74,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
new GearpumpWindowFn(windowFn.isNonMerging()),
EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window")
.groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
- .map(new KeyedByTimestamp<K, V>(timestampCombiner), "keyed_by_timestamp")
+ .map(new KeyedByTimestamp<K, V>(windowFn, timestampCombiner), "keyed_by_timestamp")
.fold(new Merge<>(windowFn, timestampCombiner), "merge")
.map(new Values<K, V>(), "values");
@@ -146,17 +146,21 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe
extends MapFunction<WindowedValue<KV<K, V>>,
KV<Instant, WindowedValue<KV<K, V>>>> {
+ private final WindowFn<KV<K, V>, BoundedWindow> windowFn;
private final TimestampCombiner timestampCombiner;
- public KeyedByTimestamp(TimestampCombiner timestampCombiner) {
+ public KeyedByTimestamp(WindowFn<KV<K, V>, BoundedWindow> windowFn,
+ TimestampCombiner timestampCombiner) {
+ this.windowFn = windowFn;
this.timestampCombiner = timestampCombiner;
}
@Override
public KV<org.joda.time.Instant, WindowedValue<KV<K, V>>> map(
WindowedValue<KV<K, V>> wv) {
- Instant timestamp = timestampCombiner.assign(
- Iterables.getOnlyElement(wv.getWindows()), wv.getTimestamp());
+ BoundedWindow window = Iterables.getOnlyElement(wv.getWindows());
+ Instant timestamp = timestampCombiner.assign(window
+ , windowFn.getOutputTime(wv.getTimestamp(), window));
return KV.of(timestamp, wv);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/98854d4d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
index 86b60aa..d5b931b 100644
--- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
+++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.time.Instant;
@@ -95,18 +94,19 @@ public class GroupByKeyTranslatorTest {
@Test
@SuppressWarnings({"rawtypes", "unchecked"})
public void testKeyedByTimestamp() {
+ WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10));
BoundedWindow window =
new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10));
GroupByKeyTranslator.KeyedByTimestamp keyedByTimestamp =
- new GroupByKeyTranslator.KeyedByTimestamp(timestampCombiner);
+ new GroupByKeyTranslator.KeyedByTimestamp(slidingWindows, timestampCombiner);
WindowedValue<KV<String, String>> value =
WindowedValue.of(
KV.of("key", "val"), org.joda.time.Instant.now(), window, PaneInfo.NO_FIRING);
KV<org.joda.time.Instant, WindowedValue<KV<String, String>>> result =
keyedByTimestamp.map(value);
org.joda.time.Instant time =
- timestampCombiner.assign(Iterables.getOnlyElement(value.getWindows()),
- value.getTimestamp());
+ timestampCombiner.assign(window,
+ slidingWindows.getOutputTime(value.getTimestamp(), window));
assertThat(result, equalTo(KV.of(time, value)));
}