You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2016/06/13 12:58:21 UTC
[1/2] incubator-beam git commit: [flink] fix potential NPE in
ParDoWrapper
Repository: incubator-beam
Updated Branches:
refs/heads/master 60964b611 -> be05942da
[flink] fix potential NPE in ParDoWrapper
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a2abc6a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a2abc6a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a2abc6a2
Branch: refs/heads/master
Commit: a2abc6a249cdc4e6000d1539df6c3b5cde8d39b0
Parents: 60964b6
Author: Maximilian Michels <mx...@apache.org>
Authored: Fri Jun 10 14:26:45 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Jun 13 14:56:54 2016 +0200
----------------------------------------------------------------------
.../wrappers/streaming/FlinkAbstractParDoWrapper.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a2abc6a2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
index a935011..3c37aa9 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java
@@ -70,18 +70,21 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
@Override
public void open(Configuration parameters) throws Exception {
- this.doFn.startBundle(context);
}
@Override
public void close() throws Exception {
- this.doFn.finishBundle(context);
+ if (this.context != null) {
+ // we have initialized the context
+ this.doFn.finishBundle(this.context);
+ }
}
@Override
public void flatMap(WindowedValue<IN> value, Collector<WindowedValue<OUTFL>> out) throws Exception {
if (this.context == null) {
this.context = new DoFnProcessContext(doFn, out);
+ this.doFn.startBundle(this.context);
}
// for each window the element belongs to, create a new copy here.
@@ -98,7 +101,7 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl
private void processElement(WindowedValue<IN> value) throws Exception {
this.context.setElement(value);
- doFn.processElement(context);
+ doFn.processElement(this.context);
}
private class DoFnProcessContext extends DoFn<IN, OUTDF>.ProcessContext {
[2/2] incubator-beam git commit: This closes #450
Posted by mx...@apache.org.
This closes #450
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/be05942d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/be05942d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/be05942d
Branch: refs/heads/master
Commit: be05942da0f09a247e195d1d29513ae40e1a95e0
Parents: 60964b6 a2abc6a
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon Jun 13 14:57:31 2016 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Mon Jun 13 14:57:31 2016 +0200
----------------------------------------------------------------------
.../wrappers/streaming/FlinkAbstractParDoWrapper.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------