You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2014/08/28 11:09:30 UTC

git commit: [FLINK-1074] Fix for NULL input tuples in ProjectJoin

Repository: incubator-flink
Updated Branches:
  refs/heads/master f96a7e0e5 -> 00840599a


[FLINK-1074] Fix for NULL input tuples in ProjectJoin


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/00840599
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/00840599
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/00840599

Branch: refs/heads/master
Commit: 00840599a7a498cbd19d524ab5ad698365cbab4f
Parents: f96a7e0
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Aug 28 10:38:53 2014 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Thu Aug 28 10:38:53 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/operators/JoinOperator.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/00840599/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 6ffbd1b..2efe7e9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -925,13 +925,13 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		public void join(T1 in1, T2 in2, Collector<R> out) {
 			for(int i=0; i<fields.length; i++) {
 				if(isFromFirst[i]) {
-					if(fields[i] >= 0) {
+					if(fields[i] >= 0 && in1 != null) {
 						outTuple.setField(((Tuple)in1).getField(fields[i]), i);
 					} else {
 						outTuple.setField(in1, i);
 					}
 				} else {
-					if(fields[i] >= 0) {
+					if(fields[i] >= 0 && in2 != null) {
 						outTuple.setField(((Tuple)in2).getField(fields[i]), i);
 					} else {
 						outTuple.setField(in2, i);