You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/06/24 21:24:09 UTC

[2/5] incubator-beam git commit: Key with integers in GatherAllPanes

Key with integers in GatherAllPanes

Ensures that runners which do not support null values can handle
GatherAllPanes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1365bb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1365bb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1365bb2

Branch: refs/heads/master
Commit: a1365bb2ca9f75d607e594a20810f53ef9232f9d
Parents: a2ab828
Author: Thomas Groh <tg...@google.com>
Authored: Fri Jun 17 15:19:26 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Jun 24 10:46:45 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/util/GatherAllPanes.java   | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1365bb2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
index ab40678..0f2ecd0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java
@@ -57,15 +57,17 @@ public class GatherAllPanes<T>
     WindowFn<?, ?> originalWindowFn = input.getWindowingStrategy().getWindowFn();
 
     return input
-        .apply(WithKeys.<Void, T>of((Void) null).withKeyType(new TypeDescriptor<Void>() {}))
-        .apply(new ReifyTimestampsAndWindows<Void, T>())
+        .apply(WithKeys.<Integer, T>of(0).withKeyType(new TypeDescriptor<Integer>() {}))
+        .apply(new ReifyTimestampsAndWindows<Integer, T>())
         .apply(
             Window.into(
-                    new IdentityWindowFn<KV<Void, WindowedValue<T>>>(
+                    new IdentityWindowFn<KV<Integer, WindowedValue<T>>>(
                         originalWindowFn.windowCoder()))
-                .triggering(Never.ever()))
+                .triggering(Never.ever())
+                .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness())
+                .discardingFiredPanes())
         // all values have the same key so they all appear as a single output element
-        .apply(GroupByKey.<Void, WindowedValue<T>>create())
+        .apply(GroupByKey.<Integer, WindowedValue<T>>create())
         .apply(Values.<Iterable<WindowedValue<T>>>create())
         .setWindowingStrategyInternal(input.getWindowingStrategy());
   }