You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2014/11/11 09:59:54 UTC

incubator-flink git commit: [FLINK-1186] Fix wrong flat key position when using expression key for nested tuples

Repository: incubator-flink
Updated Branches:
  refs/heads/release-0.7 d85acb16d -> 44b24ae28


[FLINK-1186] Fix wrong flat key position when using expression key for nested tuples


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/44b24ae2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/44b24ae2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/44b24ae2

Branch: refs/heads/release-0.7
Commit: 44b24ae2874cf183cf516b62887c0a74c50dbb2a
Parents: d85acb1
Author: mingliang <qm...@gmail.com>
Authored: Tue Oct 28 19:24:25 2014 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Mon Nov 10 18:02:03 2014 +0100

----------------------------------------------------------------------
 .../api/java/typeutils/TupleTypeInfoBase.java   |  6 ++++-
 .../flink/test/javaApiOperators/JoinITCase.java | 23 +++++++++++++++++++-
 2 files changed, 27 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/44b24ae2/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index dc75b2c..4d8a81e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -145,7 +145,11 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 				throw new RuntimeException("Element at position "+pos+" is not a composite type. There are no nested types to select");
 			}
 			CompositeType<?> cType = (CompositeType<?>) types[pos];
-			cType.getKey(rem, offset + pos, result);
+			// count nested fields before "pos"
+			for (int i = 0; i < pos; i++) {
+				offset += types[i].getTotalFields();
+			}
+			cType.getKey(rem, offset, result);
 			return;
 		}
 		

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/44b24ae2/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index bfae922..7cfb867 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -48,7 +48,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 @RunWith(Parameterized.class)
 public class JoinITCase extends JavaProgramTestBase {
 	
-	private static int NUM_PROGRAMS = 22;
+	private static int NUM_PROGRAMS = 23;
 	
 	private int curProgId = config.getInteger("ProgramId", -1);
 	private String resultPath;
@@ -662,6 +662,27 @@ public class JoinITCase extends JavaProgramTestBase {
 						"2 Second (20,200,2000,Two) 20000,(20000,20,200,2000,Two,2,Second)\n"+
 						"3 Third (30,300,3000,Three) 30000,(30000,30,300,3000,Three,3,Third)\n";
 			}
+			case 23: {
+				/*
+				 * Non-POJO test to verify "nested" tuple-element selection with the first key field greater than 0.
+				 */
+					final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+					DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+					DataSet<Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>> ds2 = ds1.join(ds1).where(0).equalTo(0);
+					DataSet<Tuple2<Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>, Tuple2<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>>> joinDs =
+						ds2.join(ds2).where("f1.f0").equalTo("f0.f0");
+
+					joinDs.writeAsCsv(resultPath);
+					env.setDegreeOfParallelism(1);
+					env.execute();
+
+					// return expected result
+					return "((1,1,Hi),(1,1,Hi)),((1,1,Hi),(1,1,Hi))\n" +
+						"((2,2,Hello),(2,2,Hello)),((2,2,Hello),(2,2,Hello))\n" +
+						"((3,2,Hello world),(3,2,Hello world)),((3,2,Hello world),(3,2,Hello world))\n";
+
+				}
 			default: 
 				throw new IllegalArgumentException("Invalid program id: "+progId);
 			}