You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/24 16:10:58 UTC
[04/14] incubator-beam git commit: Port ReifyTimestampAndWindowsDoFn
to RequiresWindowAccess
Port ReifyTimestampAndWindowsDoFn to RequiresWindowAccess
This should become either a DoFn or probably more appropriately,
just something internal to runners that actually require it to be
manifested as a DoFn at all.
As an intermediate migration step, this lessens the level to which
it depends on unsupported APIs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2350417
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2350417
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2350417
Branch: refs/heads/master
Commit: b2350417f73ae6c34f849ff0e93d5bd93df3088d
Parents: 164ee56
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 23 19:45:41 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 23 19:52:51 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/util/ReifyTimestampAndWindowsDoFn.java | 16 +++++-----------
1 file changed, 5 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b2350417/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
index 8f3f540..6da4da0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestampAndWindowsDoFn.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.util;
import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
import org.apache.beam.sdk.values.KV;
/**
@@ -28,20 +29,13 @@ import org.apache.beam.sdk.values.KV;
* @param <V> the type of the values of the input {@code PCollection}
*/
@SystemDoFnInternal
-public class ReifyTimestampAndWindowsDoFn<K, V>
- extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>> {
+public class ReifyTimestampAndWindowsDoFn<K, V> extends OldDoFn<KV<K, V>, KV<K, WindowedValue<V>>>
+ implements RequiresWindowAccess {
@Override
- public void processElement(ProcessContext c)
- throws Exception {
+ public void processElement(ProcessContext c) throws Exception {
KV<K, V> kv = c.element();
K key = kv.getKey();
V value = kv.getValue();
- c.output(KV.of(
- key,
- WindowedValue.of(
- value,
- c.timestamp(),
- c.windowingInternals().windows(),
- c.pane())));
+ c.output(KV.of(key, WindowedValue.of(value, c.timestamp(), c.window(), c.pane())));
}
}