You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/08/04 21:00:01 UTC
[3/3] flink git commit: [FLINK-2442] [fix] FieldPositionKeys support
Pojo fields
[FLINK-2442] [fix] FieldPositionKeys support Pojo fields
This closes #963
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30761572
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30761572
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30761572
Branch: refs/heads/master
Commit: 30761572b5040669b07d261ec9b109797debc549
Parents: b2d8c40
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Jul 30 21:44:06 2015 +0200
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Aug 4 18:16:30 2015 +0200
----------------------------------------------------------------------
.../apache/flink/api/java/operators/Keys.java | 50 ++++++++++----------
.../api/java/typeutils/TupleTypeInfoBase.java | 20 --------
.../flink/api/java/operators/KeysTest.java | 27 +++++++++++
3 files changed, 52 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
index 69d306f..09874e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Keys.java
@@ -223,43 +223,43 @@ public abstract class Keys<T> {
} else {
groupingFields = rangeCheckFields(groupingFields, type.getArity() -1);
}
- CompositeType<?> compositeType = (CompositeType<?>) type;
Preconditions.checkArgument(groupingFields.length > 0, "Grouping fields can not be empty at this point");
keyFields = new ArrayList<FlatFieldDescriptor>(type.getTotalFields());
// for each key, find the field:
for(int j = 0; j < groupingFields.length; j++) {
+ int keyPos = groupingFields[j];
+
+ int offset = 0;
for(int i = 0; i < type.getArity(); i++) {
- TypeInformation<?> fieldType = compositeType.getTypeAt(i);
-
- if(groupingFields[j] == i) { // check if user set the key
- int keyId = countNestedElementsBefore(compositeType, i) + i;
- if(fieldType instanceof TupleTypeInfoBase) {
- TupleTypeInfoBase<?> tupleFieldType = (TupleTypeInfoBase<?>) fieldType;
- tupleFieldType.addAllFields(keyId, keyFields);
- } else {
- Preconditions.checkArgument(fieldType instanceof AtomicType, "Wrong field type");
- keyFields.add(new FlatFieldDescriptor(keyId, fieldType));
+
+ TypeInformation fieldType = ((CompositeType<?>) type).getTypeAt(i);
+ if(i < keyPos) {
+ // not yet there, increment key offset
+ offset += fieldType.getTotalFields();
+ }
+ else {
+ // arrived at key position
+ if(fieldType instanceof CompositeType) {
+ // add all nested fields of composite type
+ ((CompositeType) fieldType).getFlatFields("*", offset, keyFields);
}
-
+ else if(fieldType instanceof AtomicType) {
+ // add atomic type field
+ keyFields.add(new FlatFieldDescriptor(offset, fieldType));
+ }
+ else {
+ // type should either be composite or atomic
+ throw new InvalidProgramException("Field type is neither CompositeType nor AtomicType: "+fieldType);
+ }
+ // go to next key
+ break;
}
}
}
keyFields = removeNullElementsFromList(keyFields);
}
-
- private static int countNestedElementsBefore(CompositeType<?> compositeType, int pos) {
- if( pos == 0) {
- return 0;
- }
- int ret = 0;
- for (int i = 0; i < pos; i++) {
- TypeInformation<?> fieldType = compositeType.getTypeAt(i);
- ret += fieldType.getTotalFields() -1;
- }
- return ret;
- }
-
+
public static <R> List<R> removeNullElementsFromList(List<R> in) {
List<R> elements = new ArrayList<R>();
for(R e: in) {
http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index 3314ca9..881e690 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -88,25 +87,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
return tupleType;
}
- /**
- * Recursively add all fields in this tuple type. We need this in particular to get all
- * the types.
- * @param startKeyId
- * @param keyFields
- */
- public void addAllFields(int startKeyId, List<FlatFieldDescriptor> keyFields) {
- for(int i = 0; i < this.getArity(); i++) {
- TypeInformation<?> type = this.types[i];
- if(type instanceof AtomicType) {
- keyFields.add(new FlatFieldDescriptor(startKeyId, type));
- } else if(type instanceof TupleTypeInfoBase<?>) {
- TupleTypeInfoBase<?> ttb = (TupleTypeInfoBase<?>) type;
- ttb.addAllFields(startKeyId, keyFields);
- }
- startKeyId += type.getTotalFields();
- }
- }
-
@Override
public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
http://git-wip-us.apache.org/repos/asf/flink/blob/30761572/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
index 67d0240..cf8936d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/KeysTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.api.java.type.extractor.PojoTypeExtractionTest.ComplexNestedClass;
+import org.apache.flink.api.java.typeutils.PojoTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.junit.Assert;
@@ -254,4 +255,30 @@ public class KeysTest {
ek = new ExpressionKeys<PojoWithMultiplePojos>(new String[]{"i0"}, ti);
Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions());
}
+
+ @Test
+ public void testTupleWithNestedPojo() {
+
+ TypeInformation<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> ti =
+ new TupleTypeInfo<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ TypeExtractor.getForClass(Pojo1.class),
+ TypeExtractor.getForClass(PojoWithMultiplePojos.class)
+ );
+
+ ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>> ek;
+
+ ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{0}, ti);
+ Assert.assertArrayEquals(new int[] {0}, ek.computeLogicalKeyPositions());
+
+ ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{1}, ti);
+ Assert.assertArrayEquals(new int[] {1,2}, ek.computeLogicalKeyPositions());
+
+ ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{2}, ti);
+ Assert.assertArrayEquals(new int[] {3,4,5,6,7}, ek.computeLogicalKeyPositions());
+
+ ek = new ExpressionKeys<Tuple3<Integer, Pojo1, PojoWithMultiplePojos>>(new int[]{}, ti, true);
+ Assert.assertArrayEquals(new int[] {0,1,2,3,4,5,6,7}, ek.computeLogicalKeyPositions());
+
+ }
}