You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/24 16:10:58 UTC

[04/14] incubator-beam git commit: Port ReifyTimestampAndWindowsDoFn to RequiresWindowAccess

Port ReifyTimestampAndWindowsDoFn to RequiresWindowAccess

This should become either a DoFn or probably more appropriately,
just something internal to runners that actually require it to be
manifested as a DoFn at all.

As an intermediate migration step, this lessens the level to which
it depends on unsupported APIs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2350417
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2350417
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2350417

Branch: refs/heads/master
Commit: b2350417f73ae6c34f849ff0e93d5bd93df3088d
Parents: 164ee56
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 23 19:45:41 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/util/ReifyTimestampAndWindowsDoFn.java | 16 +++++-----------
 1 file changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b2350417/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
index 8f3f540..6da4da0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.util;
 
 import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.values.KV;
 
 /**
@@ -28,20 +29,13 @@ import org.apache.beam.sdk.values.KV;
  * @param <V> the type of the values of the input {@code PCollection}
  */
 @SystemDoFnInternal
-public class ReifyTimestampAndWindowsDoFn<K, V>
-    extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>> {
+public class ReifyTimestampAndWindowsDoFn<K, V> extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>>
+    implements RequiresWindowAccess {
   @Override
-  public void processElement(ProcessContext c)
-      throws Exception {
+  public void processElement(ProcessContext c) throws Exception {
     KV<K, V> kv = c.element();
     K key = kv.getKey();
     V value = kv.getValue();
-    c.output(KV.of(
-        key,
-        WindowedValue.of(
-            value,
-            c.timestamp(),
-            c.windowingInternals().windows(),
-            c.pane())));
+    c.output(KV.of(key, WindowedValue.of(value, c.timestamp(), c.window(), c.pane())));
   }
 }