You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/08/04 21:11:00 UTC

[2/2] flink git commit: [FLINK-2442] [fix] FieldPositionKeys support Pojo fields

[FLINK-2442] [fix] FieldPositionKeys support Pojo fields


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/80d3478c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/80d3478c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/80d3478c

Branch: refs/heads/release-0.9
Commit: 80d3478c0be6c346c0d33d966385c3db4eb06769
Parents: 8797890
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Jul 30 21:44:06 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 4 21:10:19 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/api/java/operators/Keys.java   | 50 ++++++++++----------
 .../api/java/typeutils/TupleTypeInfoBase.java   | 20 --------
 .../flink/api/java/operators/KeysTest.java      | 27 +++++++++++
 3 files changed, 52 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/80d3478c/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 69d306f..09874e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -223,43 +223,43 @@ public abstract class Keys<T> {
 			} else {
 				groupingFields = rangeCheckFields(groupingFields, type.getArity() -1);
 			}
-			CompositeType<?> compositeType = (CompositeType<?>) type;
 			Preconditions.checkArgument(groupingFields.length > 0, "Grouping fields can not be empty at this point");
 			
 			keyFields = new ArrayList<FlatFieldDescriptor>(type.getTotalFields());
 			// for each key, find the field:
 			for(int j = 0; j < groupingFields.length; j++) {
+				int keyPos = groupingFields[j];
+
+				int offset = 0;
 				for(int i = 0; i < type.getArity(); i++) {
-					TypeInformation<?> fieldType = compositeType.getTypeAt(i);
-					
-					if(groupingFields[j] == i) { // check if user set the key
-						int keyId = countNestedElementsBefore(compositeType, i) + i;
-						if(fieldType instanceof TupleTypeInfoBase) {
-							TupleTypeInfoBase<?> tupleFieldType = (TupleTypeInfoBase<?>) fieldType;
-							tupleFieldType.addAllFields(keyId, keyFields);
-						} else {
-							Preconditions.checkArgument(fieldType instanceof AtomicType, "Wrong field type");
-							keyFields.add(new FlatFieldDescriptor(keyId, fieldType));
+
+					TypeInformation fieldType = ((CompositeType<?>) type).getTypeAt(i);
+					if(i < keyPos) {
+						// not yet there, increment key offset
+						offset += fieldType.getTotalFields();
+					}
+					else {
+						// arrived at key position
+						if(fieldType instanceof CompositeType) {
+							// add all nested fields of composite type
+							((CompositeType) fieldType).getFlatFields("*", offset, keyFields);
 						}
-						
+						else if(fieldType instanceof AtomicType) {
+							// add atomic type field
+							keyFields.add(new FlatFieldDescriptor(offset, fieldType));
+						}
+						else {
+							// type should either be composite or atomic
+							throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: "+fieldType);
+						}
+						// go to next key
+						break;
 					}
 				}
 			}
 			keyFields = removeNullElementsFromList(keyFields);
 		}
-		
-		private static int countNestedElementsBefore(CompositeType<?> compositeType, int pos) {
-			if( pos == 0) {
-				return 0;
-			}
-			int ret = 0;
-			for (int i = 0; i < pos; i++) {
-				TypeInformation<?> fieldType = compositeType.getTypeAt(i);
-				ret += fieldType.getTotalFields() -1;
-			}
-			return ret;
-		}
-		
+
 		public static <R> List<R> removeNullElementsFromList(List<R> in) {
 			List<R> elements = new ArrayList<R>();
 			for(R e: in) {

http://git-wip-us.apache.org/repos/asf/flink/blob/80d3478c/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 5051449..c38dc77 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.flink.api.common.typeinfo.AtomicType;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.CompositeType;
 import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -84,25 +83,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
 		return tupleType;
 	}
 
-	/**
-	 * Recursively add all fields in this tuple type. We need this in particular to get all
-	 * the types.
-	 * @param startKeyId
-	 * @param keyFields
-	 */
-	public void addAllFields(int startKeyId, List<FlatFieldDescriptor> keyFields) {
-		for(int i = 0; i < this.getArity(); i++) {
-			TypeInformation<?> type = this.types[i];
-			if(type instanceof AtomicType) {
-				keyFields.add(new FlatFieldDescriptor(startKeyId, type));
-			} else if(type instanceof TupleTypeInfoBase<?>) {
-				TupleTypeInfoBase<?> ttb = (TupleTypeInfoBase<?>) type;
-				ttb.addAllFields(startKeyId, keyFields);
-			}
-			startKeyId += type.getTotalFields();
-		}
-	}
-
 	@Override
 	public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/80d3478c/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
index 67d0240..cf8936d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest.ComplexNestedClass;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.junit.Assert;
@@ -254,4 +255,30 @@ public class KeysTest {
 		ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"i0"}, ti);
 		Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions());
 	}
+
+	@Test
+	public void testTupleWithNestedPojo() {
+
+		TypeInformation<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> ti =
+				new TupleTypeInfo<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(
+					BasicTypeInfo.INT_TYPE_INFO,
+					TypeExtractor.getForClass(Pojo1.class),
+					TypeExtractor.getForClass(PojoWithMultiplePojos.class)
+				);
+
+		ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> ek;
+
+		ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{0}, ti);
+		Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions());
+
+		ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{1}, ti);
+		Assert.assertArrayEquals(new int[] {1,2}, ek.computeLogicalKeyPositions());
+
+		ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{2}, ti);
+		Assert.assertArrayEquals(new int[] {3,4,5,6,7}, ek.computeLogicalKeyPositions());
+
+		ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{}, ti, true);
+		Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6,7}, ek.computeLogicalKeyPositions());
+
+	}
 }