You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/03/31 14:01:29 UTC
[1/3] incubator-beam git commit: [flink] improve lifecycle handling
of GroupAlsoByWindowWrapper
Repository: incubator-beam
Updated Branches:
refs/heads/master 0c47cad48 -> 96e286fec
[flink] improve lifecycle handling of GroupAlsoByWindowWrapper
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/033b9240
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/033b9240
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/033b9240
Branch: refs/heads/master
Commit: 033b9240765543438068c1adea6d0cff34ddcd53
Parents: 17863c8
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Mar 28 11:31:38 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Mar 30 11:31:56 2016 +0200
----------------------------------------------------------------------
.../wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java | 6 ++----
1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/033b9240/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index b413d7a..751d44c 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -220,6 +220,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
public void open() throws Exception {
super.open();
this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals);
+ operator.startBundle(context);
}
/**
@@ -252,11 +253,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception {
context.setElement(workItem, getStateInternalsForKey(workItem.key()));
-
- // TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded.
- operator.startBundle(context);
operator.processElement(context);
- operator.finishBundle(context);
}
@Override
@@ -309,6 +306,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
@Override
public void close() throws Exception {
+ operator.finishBundle(context);
super.close();
}
[3/3] incubator-beam git commit: This closes #94.
Posted by mx...@apache.org.
This closes #94.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/96e286fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/96e286fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/96e286fe
Branch: refs/heads/master
Commit: 96e286fec758bb451ff383e6e7c3f2b5bb0cb840
Parents: 0c47cad 63a7c3d
Author: Maximilian Michels <mx...@apache.org>
Authored: Thu Mar 31 11:11:09 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Mar 31 11:11:09 2016 +0200
----------------------------------------------------------------------
.../FlinkGroupAlsoByWindowWrapper.java | 22 +++++++++++++-------
1 file changed, 14 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[2/3] incubator-beam git commit: [flink] improve readability of
processElement function
Posted by mx...@apache.org.
[flink] improve readability of processElement function
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63a7c3d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63a7c3d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63a7c3d0
Branch: refs/heads/master
Commit: 63a7c3d0cb51caf65dc82141671cf28d47c2be39
Parents: 033b924
Author: Maximilian Michels <mx...@apache.org>
Authored: Wed Mar 30 12:02:01 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed Mar 30 12:05:04 2016 +0200
----------------------------------------------------------------------
.../streaming/FlinkGroupAlsoByWindowWrapper.java | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/63a7c3d0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 751d44c..3dc5a79 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -258,10 +258,18 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
@Override
public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element) throws Exception {
- ArrayList<WindowedValue<VIN>> elements = new ArrayList<>();
- elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(),
- element.getValue().getWindows(), element.getValue().getPane()));
- processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(), elements));
+ final WindowedValue<KV<K, VIN>> windowedValue = element.getValue();
+ final KV<K, VIN> kv = windowedValue.getValue();
+
+ final WindowedValue<VIN> updatedWindowedValue = WindowedValue.of(kv.getValue(),
+ windowedValue.getTimestamp(),
+ windowedValue.getWindows(),
+ windowedValue.getPane());
+
+ processKeyedWorkItem(
+ KeyedWorkItems.elementsWorkItem(
+ kv.getKey(),
+ Collections.singletonList(updatedWindowedValue)));
}
@Override