You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/24 21:24:09 UTC
[2/5] incubator-beam git commit: Key with integers in GatherAllPanes
Key with integers in GatherAllPanes
Ensures that runners which do not support null values can handle
GatherAllPanes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1365bb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1365bb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1365bb2
Branch: refs/heads/master
Commit: a1365bb2ca9f75d607e594a20810f53ef9232f9d
Parents: a2ab828
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 17 15:19:26 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 24 10:46:45 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/util/GatherAllPanes.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1365bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
index ab40678..0f2ecd0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
@@ -57,15 +57,17 @@ public class GatherAllPanes<T>
WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
return input
- .apply(WithKeys.<Void, T>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
- .apply(new ReifyTimestampsAndWindows<Void, T>())
+ .apply(WithKeys.<Integer, T>of(0).withKeyType(new TypeDescriptor<Integer>() {}))
+ .apply(new ReifyTimestampsAndWindows<Integer, T>())
.apply(
Window.into(
- new IdentityWindowFn<KV<Void, WindowedValue<T>>>(
+ new IdentityWindowFn<KV<Integer, WindowedValue<T>>>(
originalWindowFn.windowCoder()))
- .triggering(Never.ever()))
+ .triggering(Never.ever())
+ .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
+ .discardingFiredPanes())
// all values have the same key so they all appear as a single output element
- .apply(GroupByKey.<Void, WindowedValue<T>>create())
+ .apply(GroupByKey.<Integer, WindowedValue<T>>create())
.apply(Values.<Iterable<WindowedValue<T>>>create())
.setWindowingStrategyInternal(input.getWindowingStrategy());
}