You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/18 16:50:51 UTC

flink git commit: [FLINK-1641] Make projection operator chainable.

Repository: flink
Updated Branches:
  refs/heads/master ef856a920 -> 1930678fb


[FLINK-1641] Make projection operator chainable.

Closes #489


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1930678f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1930678f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1930678f

Branch: refs/heads/master
Commit: 1930678fb534baeb427097520ef4b5170d3a24cd
Parents: ef856a9
Author: Gabor Gevay <gg...@gmail.com>
Authored: Tue Mar 17 03:02:35 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Mar 18 15:22:18 2015 +0100

----------------------------------------------------------------------
 .../api/invokable/operator/ProjectInvokable.java      | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1930678f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
index 3e47107..8f57fe7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/ProjectInvokable.java
@@ -21,9 +21,9 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 
-public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN, OUT> {
+public class ProjectInvokable<IN, OUT extends Tuple> extends ChainableInvokable<IN, OUT> {
 	private static final long serialVersionUID = 1L;
 
 	transient OUT outTuple;
@@ -49,7 +49,7 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
 	@Override
 	protected void callUserFunction() throws Exception {
 		for (int i = 0; i < this.numFields; i++) {
-			outTuple.setField(nextRecord.getField(fields[i]), i);
+			outTuple.setField(((Tuple)nextObject).getField(fields[i]), i);
 		}
 		collector.collect(outTuple);
 	}
@@ -60,4 +60,12 @@ public class ProjectInvokable<IN, OUT extends Tuple> extends StreamInvokable<IN,
 		this.outTypeSerializer = outTypeInformation.createSerializer(executionConfig);
 		outTuple = outTypeSerializer.createInstance();
 	}
+
+	@Override
+	public void collect(IN record) {
+		if (isRunning) {
+			nextObject = copy(record);
+			callUserFunctionAndLogException();
+		}
+	}
 }