You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/18 16:50:51 UTC
flink git commit: [FLINK-1641] Make projection operator chainable.
Repository: flink
Updated Branches:
refs/heads/master ef856a920 -> 1930678fb
[FLINK-1641] Make projection operator chainable.
Closes #489
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1930678f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1930678f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1930678f
Branch: refs/heads/master
Commit: 1930678fb534baeb427097520ef4b5170d3a24cd
Parents: ef856a9
Author: Gabor Gevay <gg...@gmail.com>
Authored: Tue Mar 17 03:02:35 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Mar 18 15:22:18 2015 +0100
----------------------------------------------------------------------
.../api/invokable/operator/ProjectInvokable.java | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1930678f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index 3e47107..8f57fe7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -21,9 +21,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, OUT> {
+public class ProjectInvokable<IN, OUT extends Tuple> extends ChainableInvokable<IN, OUT> {
private static final long serialVersionUID = 1L;
transient OUT outTuple;
@@ -49,7 +49,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
@Override
protected void callUserFunction() throws Exception {
for (int i = 0; i < this.numFields; i++) {
- outTuple.setField(nextRecord.getField(fields[i]), i);
+ outTuple.setField(((Tuple)nextObject).getField(fields[i]), i);
}
collector.collect(outTuple);
}
@@ -60,4 +60,12 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
outTuple = outTypeSerializer.createInstance();
}
+
+ @Override
+ public void collect(IN record) {
+ if (isRunning) {
+ nextObject = copy(record);
+ callUserFunctionAndLogException();
+ }
+ }
}