You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2015/11/27 14:59:39 UTC
flink git commit: [FLINK-3046] Integrate the Either Java type with
the TypeExtractor
Repository: flink
Updated Branches:
refs/heads/master e69d14521 -> f73a12e72
[FLINK-3046] Integrate the Either Java type with the TypeExtractor
This closes #1393.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f73a12e7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f73a12e7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f73a12e7
Branch: refs/heads/master
Commit: f73a12e728431d87065f7d0082380a527df39e44
Parents: e69d145
Author: twalthr <tw...@apache.org>
Authored: Mon Nov 23 15:08:47 2015 +0100
Committer: twalthr <tw...@apache.org>
Committed: Fri Nov 27 14:58:13 2015 +0100
----------------------------------------------------------------------
.../api/java/typeutils/EitherTypeInfo.java | 12 +-
.../flink/api/java/typeutils/TypeExtractor.java | 196 ++++++++++++++-----
.../java/type/extractor/TypeExtractorTest.java | 81 +++++++-
3 files changed, 235 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f73a12e7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
index 40ed0c0..ec7be97 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
@@ -37,7 +37,7 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
private final TypeInformation<R> rightType;
- public EitherTypeInfo(TypeInformation<L> leftType,TypeInformation<R> rightType) {
+ public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> rightType) {
this.leftType = leftType;
this.rightType = rightType;
}
@@ -108,4 +108,14 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
return obj instanceof EitherTypeInfo;
}
+ // --------------------------------------------------------------------------------------------
+
+ public TypeInformation<L> getLeftType() {
+ return leftType;
+ }
+
+ public TypeInformation<R> getRightType() {
+ return rightType;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f73a12e7/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 782d58d..ff6a82c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -420,65 +420,53 @@ public class TypeExtractor {
}
typeHierarchy.add(curT);
-
- ParameterizedType tupleChild = (ParameterizedType) curT;
-
- Type[] subtypes = new Type[tupleChild.getActualTypeArguments().length];
-
- // materialize possible type variables
- for (int i = 0; i < subtypes.length; i++) {
- // materialize immediate TypeVariables
- if (tupleChild.getActualTypeArguments()[i] instanceof TypeVariable<?>) {
- subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) tupleChild.getActualTypeArguments()[i]);
+
+ // create the type information for the subtypes
+ TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type);
+ // type needs to be treated a pojo due to additional fields
+ if (subTypesInfo == null) {
+ if (t instanceof ParameterizedType) {
+ return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
}
- // class or parameterized type
else {
- subtypes[i] = tupleChild.getActualTypeArguments()[i];
+ return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
}
}
+ // return tuple info
+ return new TupleTypeInfo(typeToClass(t), subTypesInfo);
- TypeInformation<?>[] tupleSubTypes = new TypeInformation<?>[subtypes.length];
- for (int i = 0; i < subtypes.length; i++) {
- ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy);
- subTypeHierarchy.add(subtypes[i]);
- // sub type could not be determined with materializing
- // try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
- if (subtypes[i] instanceof TypeVariable<?>) {
- tupleSubTypes[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
-
- // variable could not be determined
- if (tupleSubTypes[i] == null) {
- throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '"
- + ((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
- + "' could not be determined. This is most likely a type erasure problem. "
- + "The type extraction currently supports types with generic variables only in cases where "
- + "all variables in the return type can be deduced from the input type(s).");
- }
- } else {
- tupleSubTypes[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
- }
+ }
+ // check if type is a subclass of Either
+ else if (isClassType(t) && Either.class.isAssignableFrom(typeToClass(t))) {
+ Type curT = t;
+
+ // go up the hierarchy until we reach Either (with or without generics)
+ // collect the types while moving up for a later top-down
+ while (!(isClassType(curT) && typeToClass(curT).equals(Either.class))) {
+ typeHierarchy.add(curT);
+ curT = typeToClass(curT).getGenericSuperclass();
}
-
- Class<?> tAsClass = null;
- if (isClassType(t)) {
- tAsClass = typeToClass(t);
- }
- Preconditions.checkNotNull(tAsClass, "t has a unexpected type");
- // check if the class we assumed to be a Tuple so far is actually a pojo because it contains additional fields.
- // check for additional fields.
- int fieldCount = countFieldsInClass(tAsClass);
- if(fieldCount != tupleSubTypes.length) {
- // the class is not a real tuple because it contains additional fields. treat as a pojo
+
+ // check if Either has generics
+ if (curT instanceof Class<?>) {
+ throw new InvalidTypesException("Either needs to be parameterized by using generics.");
+ }
+
+ typeHierarchy.add(curT);
+
+ // create the type information for the subtypes
+ TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type);
+ // type needs to be treated a pojo due to additional fields
+ if (subTypesInfo == null) {
if (t instanceof ParameterizedType) {
- return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
+ return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
}
else {
- return (TypeInformation<OUT>) analyzePojo(tAsClass, new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
+ return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
}
}
-
- return new TupleTypeInfo(tAsClass, tupleSubTypes);
-
+ // return either info
+ return (TypeInformation<OUT>) new EitherTypeInfo(subTypesInfo[0], subTypesInfo[1]);
}
// type depends on another type
// e.g. class MyMapper<E> extends MapFunction<String, E>
@@ -675,6 +663,71 @@ public class TypeExtractor {
}
return info;
}
+
+ /**
+ * Creates the TypeInformation for all elements of a type that expects a certain number of
+ * subtypes (e.g. TupleXX or Either).
+ *
+ * @param originalType most concrete subclass
+ * @param definingType type that defines the number of subtypes (e.g. Tuple2 -> 2 subtypes)
+ * @param typeHierarchy necessary for type inference
+ * @param in1Type necessary for type inference
+ * @param in2Type necessary for type inference
+ * @return array containing TypeInformation of sub types or null if definingType contains
+ * more subtypes (fields) that defined
+ */
+ private <IN1, IN2> TypeInformation<?>[] createSubTypesInfo(Type originalType, ParameterizedType definingType,
+ ArrayList<Type> typeHierarchy, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
+ Type[] subtypes = new Type[definingType.getActualTypeArguments().length];
+
+ // materialize possible type variables
+ for (int i = 0; i < subtypes.length; i++) {
+ // materialize immediate TypeVariables
+ if (definingType.getActualTypeArguments()[i] instanceof TypeVariable<?>) {
+ subtypes[i] = materializeTypeVariable(typeHierarchy, (TypeVariable<?>) definingType.getActualTypeArguments()[i]);
+ }
+ // class or parameterized type
+ else {
+ subtypes[i] = definingType.getActualTypeArguments()[i];
+ }
+ }
+
+ TypeInformation<?>[] subTypesInfo = new TypeInformation<?>[subtypes.length];
+ for (int i = 0; i < subtypes.length; i++) {
+ ArrayList<Type> subTypeHierarchy = new ArrayList<Type>(typeHierarchy);
+ subTypeHierarchy.add(subtypes[i]);
+ // sub type could not be determined with materializing
+ // try to derive the type info of the TypeVariable from the immediate base child input as a last attempt
+ if (subtypes[i] instanceof TypeVariable<?>) {
+ subTypesInfo[i] = createTypeInfoFromInputs((TypeVariable<?>) subtypes[i], subTypeHierarchy, in1Type, in2Type);
+
+ // variable could not be determined
+ if (subTypesInfo[i] == null) {
+ throw new InvalidTypesException("Type of TypeVariable '" + ((TypeVariable<?>) subtypes[i]).getName() + "' in '"
+ + ((TypeVariable<?>) subtypes[i]).getGenericDeclaration()
+ + "' could not be determined. This is most likely a type erasure problem. "
+ + "The type extraction currently supports types with generic variables only in cases where "
+ + "all variables in the return type can be deduced from the input type(s).");
+ }
+ } else {
+ subTypesInfo[i] = createTypeInfoWithTypeHierarchy(subTypeHierarchy, subtypes[i], in1Type, in2Type);
+ }
+ }
+
+ Class<?> originalTypeAsClass = null;
+ if (isClassType(originalType)) {
+ originalTypeAsClass = typeToClass(originalType);
+ }
+ Preconditions.checkNotNull(originalTypeAsClass, "originalType has an unexpected type");
+ // check if the class we assumed to conform to the defining type so far is actually a pojo because the
+ // original type contains additional fields.
+ // check for additional fields.
+ int fieldCount = countFieldsInClass(originalTypeAsClass);
+ if(fieldCount > subTypesInfo.length) {
+ return null;
+ }
+ return subTypesInfo;
+ }
// --------------------------------------------------------------------------------------------
// Extract type parameters
@@ -830,9 +883,32 @@ public class TypeExtractor {
}
for (int i = 0; i < subTypes.length; i++) {
- validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], ((TupleTypeInfo<?>) typeInfo).getTypeAt(i));
+ validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
}
}
+ // check for Either
+ else if (typeInfo instanceof EitherTypeInfo) {
+ // check if Either at all
+ if (!(isClassType(type) && Either.class.isAssignableFrom(typeToClass(type)))) {
+ throw new InvalidTypesException("Either type expected.");
+ }
+
+ // go up the hierarchy until we reach Either (with or without generics)
+ while (!(isClassType(type) && typeToClass(type).equals(Either.class))) {
+ typeHierarchy.add(type);
+ type = typeToClass(type).getGenericSuperclass();
+ }
+
+ // check if Either has generics
+ if (type instanceof Class<?>) {
+ throw new InvalidTypesException("Parameterized Either type expected.");
+ }
+
+ EitherTypeInfo<?, ?> eti = (EitherTypeInfo<?, ?>) typeInfo;
+ Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments();
+ validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[0], eti.getLeftType());
+ validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[1], eti.getRightType());
+ }
// check for Writable
else if (typeInfo instanceof WritableTypeInfo<?>) {
// check if writable at all
@@ -1224,7 +1300,7 @@ public class TypeExtractor {
if(Writable.class.isAssignableFrom(clazz) && !Writable.class.equals(clazz)) {
return (TypeInformation<OUT>) WritableTypeInfo.getWritableTypeInfo((Class<? extends Writable>) clazz);
}
-
+
// check for basic types
TypeInformation<OUT> basicTypeInfo = BasicTypeInfo.getInfoFor(clazz);
if (basicTypeInfo != null) {
@@ -1245,6 +1321,11 @@ public class TypeExtractor {
throw new InvalidTypesException("Type information extraction for tuples (except Tuple0) cannot be done based on the class.");
}
+ // check for subclasses of Either
+ if (Either.class.isAssignableFrom(clazz)) {
+ throw new InvalidTypesException("Type information extraction for Either cannot be done based on the class.");
+ }
+
// check for Enums
if(Enum.class.isAssignableFrom(clazz)) {
return new EnumTypeInfo(clazz);
@@ -1558,7 +1639,20 @@ public class TypeExtractor {
infos[i] = privateGetForObject(field);
}
return new TupleTypeInfo(value.getClass(), infos);
- } else {
+ }
+ // we can not extract the types from an Either object since it only contains type information
+ // of one type, but from Either classes
+ else if (value instanceof Either) {
+ try {
+ return (TypeInformation<X>) privateCreateTypeInfo(value.getClass());
+ }
+ catch (InvalidTypesException e) {
+ throw new InvalidTypesException("Automatic type extraction is not possible on an Either type "
+ + "as it does not contain information about both possible types. "
+ + "Please specify the types directly.");
+ }
+ }
+ else {
return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f73a12e7/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index eae767d..7abfc76 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -46,6 +46,8 @@ import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.api.java.typeutils.Either;
+import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.api.java.typeutils.EnumTypeInfo;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.api.java.typeutils.MissingTypeInfo;
@@ -67,8 +69,6 @@ import org.apache.hadoop.io.Writable;
import org.junit.Assert;
import org.junit.Test;
-import javax.xml.bind.TypeConstraintException;
-
public class TypeExtractorTest {
@@ -1829,4 +1829,81 @@ public class TypeExtractorTest {
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction)function, BasicTypeInfo.INT_TYPE_INFO);
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, ti);
}
+
+ public static class Either1<T> extends Either<String, T> {
+ @Override
+ public String left() throws IllegalStateException {
+ return null;
+ }
+
+ @Override
+ public T right() throws IllegalStateException {
+ return null;
+ }
+ }
+
+ public static class Either2 extends Either1<Tuple1<Integer>> {
+ // nothing to do here
+ }
+
+ public static class EitherMapper<T> implements MapFunction<T, Either1<T>> {
+ @Override
+ public Either1<T> map(T value) throws Exception {
+ return null;
+ }
+ }
+
+ public static class EitherMapper2 implements MapFunction<String, Either2> {
+ @Override
+ public Either2 map(String value) throws Exception {
+ return null;
+ }
+ }
+
+ public static class EitherMapper3 implements MapFunction<Either2, Either2> {
+ @Override
+ public Either2 map(Either2 value) throws Exception {
+ return null;
+ }
+ }
+
+ @Test
+ public void testEither() {
+ MapFunction<?, ?> function = new MapFunction<Either<String, Boolean>, Either<String, Boolean>>() {
+ @Override
+ public Either<String, Boolean> map(Either<String, Boolean> value) throws Exception {
+ return null;
+ }
+ };
+ TypeInformation<?> expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) function, expected);
+ Assert.assertEquals(expected, ti);
+ }
+
+ @Test
+ public void testEitherHierarchy() {
+ MapFunction<?, ?> function = new EitherMapper<Boolean>();
+ TypeInformation<?> ti = TypeExtractor.getMapReturnTypes((MapFunction) function, BasicTypeInfo.BOOLEAN_TYPE_INFO);
+ TypeInformation<?> expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BOOLEAN_TYPE_INFO);
+ Assert.assertEquals(expected, ti);
+
+ function = new EitherMapper2();
+ ti = TypeExtractor.getMapReturnTypes((MapFunction) function, BasicTypeInfo.STRING_TYPE_INFO);
+ expected = new EitherTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, new TupleTypeInfo(BasicTypeInfo.INT_TYPE_INFO));
+ Assert.assertEquals(expected, ti);
+
+ function = new EitherMapper3();
+ ti = TypeExtractor.getMapReturnTypes((MapFunction) function, expected);
+ Assert.assertEquals(expected, ti);
+
+ Either<String, Tuple1<Integer>> either = new Either2();
+ ti = TypeExtractor.getForObject(either);
+ Assert.assertEquals(expected, ti);
+ }
+
+ @Test(expected=InvalidTypesException.class)
+ public void testEitherFromObjectException() {
+ Either<String, Tuple1<Integer>> either = Either.left("test");
+ TypeExtractor.getForObject(either);
+ }
}