You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/07/06 17:20:10 UTC
[04/50] [abbrv] incubator-beam git commit: Key with integers in
GatherAllPanes
Key with integers in GatherAllPanes
Ensures that runners which do not support null values can handle
GatherAllPanes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/947a5425
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/947a5425
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/947a5425
Branch: refs/heads/runners-spark2
Commit: 947a54250ef8b2d8e707e52934890b80119ac408
Parents: f146f06
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 17 15:19:26 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Wed Jul 6 10:18:49 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/util/GatherAllPanes.java | 12 +++++++-----
1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/947a5425/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
index ab40678..0f2ecd0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
@@ -57,15 +57,17 @@ public class GatherAllPanes<T>
WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
return input
- .apply(WithKeys.<Void, T>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
- .apply(new ReifyTimestampsAndWindows<Void, T>())
+ .apply(WithKeys.<Integer, T>of(0).withKeyType(new TypeDescriptor<Integer>() {}))
+ .apply(new ReifyTimestampsAndWindows<Integer, T>())
.apply(
Window.into(
- new IdentityWindowFn<KV<Void, WindowedValue<T>>>(
+ new IdentityWindowFn<KV<Integer, WindowedValue<T>>>(
originalWindowFn.windowCoder()))
- .triggering(Never.ever()))
+ .triggering(Never.ever())
+ .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
+ .discardingFiredPanes())
// all values have the same key so they all appear as a single output element
- .apply(GroupByKey.<Void, WindowedValue<T>>create())
+ .apply(GroupByKey.<Integer, WindowedValue<T>>create())
.apply(Values.<Iterable<WindowedValue<T>>>create())
.setWindowingStrategyInternal(input.getWindowingStrategy());
}