You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/01/10 14:25:44 UTC
flink git commit: [FLINK-4673] [core] TypeInfoFactory for Either type
Repository: flink
Updated Branches:
refs/heads/master f11447e58 -> d4d7cc326
[FLINK-4673] [core] TypeInfoFactory for Either type
Removes from TypeExtractor the explicit parsing for Either and adds an
EitherTypeInfoFactory.
This closes #2545.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d4d7cc32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d4d7cc32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d4d7cc32
Branch: refs/heads/master
Commit: d4d7cc32667016d66c65a7d64601cabd101a0c4d
Parents: f11447e
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Sep 23 14:59:36 2016 -0400
Committer: twalthr <tw...@apache.org>
Committed: Tue Jan 10 14:52:38 2017 +0100
----------------------------------------------------------------------
.../api/java/typeutils/EitherTypeInfo.java | 20 +++-
.../java/typeutils/EitherTypeInfoFactory.java | 48 ++++++++
.../flink/api/java/typeutils/TypeExtractor.java | 120 ++++---------------
.../java/org/apache/flink/types/Either.java | 3 +
4 files changed, 91 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
index 058de12..d4d1aa9 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfo.java
@@ -18,14 +18,19 @@
package org.apache.flink.api.java.typeutils;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
import org.apache.flink.types.Either;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* A {@link TypeInformation} for the {@link Either} type of the Java API.
*
@@ -43,8 +48,8 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
@PublicEvolving
public EitherTypeInfo(TypeInformation<L> leftType, TypeInformation<R> rightType) {
- this.leftType = leftType;
- this.rightType = rightType;
+ this.leftType = checkNotNull(leftType);
+ this.rightType = checkNotNull(rightType);
}
@Override
@@ -80,6 +85,15 @@ public class EitherTypeInfo<L, R> extends TypeInformation<Either<L, R>> {
@Override
@PublicEvolving
+ public Map<String, TypeInformation<?>> getGenericParameters() {
+ Map<String, TypeInformation<?>> m = new HashMap<>();
+ m.put("L", this.leftType);
+ m.put("R", this.rightType);
+ return m;
+ }
+
+ @Override
+ @PublicEvolving
public boolean isKeyType() {
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java
new file mode 100644
index 0000000..be881a7
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/EitherTypeInfoFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils;
+
+import org.apache.flink.api.common.functions.InvalidTypesException;
+import org.apache.flink.api.common.typeinfo.TypeInfoFactory;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.types.Either;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+public class EitherTypeInfoFactory<L, R> extends TypeInfoFactory<Either<L, R>> {
+
+ @Override
+ public TypeInformation<Either<L, R>> createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
+ TypeInformation<?> leftType = genericParameters.get("L");
+ TypeInformation<?> rightType = genericParameters.get("R");
+
+ if (leftType == null) {
+ throw new InvalidTypesException("Type extraction is not possible on Either" +
+ " type as it does not contain information about the 'left' type.");
+ }
+
+ if (rightType == null) {
+ throw new InvalidTypesException("Type extraction is not possible on Either" +
+ " type as it does not contain information about the 'right' type.");
+ }
+
+ return new EitherTypeInfo(leftType, rightType);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 08f8c53..df4a0e0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -18,28 +18,11 @@
package org.apache.flink.api.java.typeutils;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.Field;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Modifier;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
-import java.lang.reflect.TypeVariable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.HashMap;
-import java.util.List;
-
-import java.util.Map;
import org.apache.avro.specific.SpecificRecordBase;
-
import org.apache.commons.lang3.ClassUtils;
-import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
@@ -66,19 +49,34 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.typeutils.TypeExtractionUtils.LambdaExecutable;
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isClassType;
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.sameTypeVars;
-import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClass;
-import org.apache.flink.types.Either;
import org.apache.flink.types.Value;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.GenericArrayType;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.checkAndExtractLambda;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.getAllDeclaredMethods;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.isClassType;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.sameTypeVars;
+import static org.apache.flink.api.java.typeutils.TypeExtractionUtils.typeToClass;
/**
* A utility for reflection analysis on classes, to determine the return type of implementations of transformation
@@ -690,38 +688,6 @@ public class TypeExtractor {
return new TupleTypeInfo(typeToClass(t), subTypesInfo);
}
- // check if type is a subclass of Either
- else if (isClassType(t) && Either.class.isAssignableFrom(typeToClass(t))) {
- Type curT = t;
-
- // go up the hierarchy until we reach Either (with or without generics)
- // collect the types while moving up for a later top-down
- while (!(isClassType(curT) && typeToClass(curT).equals(Either.class))) {
- typeHierarchy.add(curT);
- curT = typeToClass(curT).getGenericSuperclass();
- }
-
- // check if Either has generics
- if (curT instanceof Class<?>) {
- throw new InvalidTypesException("Either needs to be parameterized by using generics.");
- }
-
- typeHierarchy.add(curT);
-
- // create the type information for the subtypes
- final TypeInformation<?>[] subTypesInfo = createSubTypesInfo(t, (ParameterizedType) curT, typeHierarchy, in1Type, in2Type, false);
- // type needs to be treated a pojo due to additional fields
- if (subTypesInfo == null) {
- if (t instanceof ParameterizedType) {
- return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), (ParameterizedType) t, in1Type, in2Type);
- }
- else {
- return (TypeInformation<OUT>) analyzePojo(typeToClass(t), new ArrayList<Type>(typeHierarchy), null, in1Type, in2Type);
- }
- }
- // return either info
- return (TypeInformation<OUT>) new EitherTypeInfo(subTypesInfo[0], subTypesInfo[1]);
- }
// type depends on another type
// e.g. class MyMapper<E> extends MapFunction<String, E>
else if (t instanceof TypeVariable) {
@@ -947,7 +913,7 @@ public class TypeExtractor {
/**
* Creates the TypeInformation for all elements of a type that expects a certain number of
- * subtypes (e.g. TupleXX or Either).
+ * subtypes (e.g. TupleXX).
*
* @param originalType most concrete subclass
* @param definingType type that defines the number of subtypes (e.g. Tuple2 -> 2 subtypes)
@@ -1234,29 +1200,6 @@ public class TypeExtractor {
validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[i], tti.getTypeAt(i));
}
}
- // check for Either
- else if (typeInfo instanceof EitherTypeInfo) {
- // check if Either at all
- if (!(isClassType(type) && Either.class.isAssignableFrom(typeToClass(type)))) {
- throw new InvalidTypesException("Either type expected.");
- }
-
- // go up the hierarchy until we reach Either (with or without generics)
- while (!(isClassType(type) && typeToClass(type).equals(Either.class))) {
- typeHierarchy.add(type);
- type = typeToClass(type).getGenericSuperclass();
- }
-
- // check if Either has generics
- if (type instanceof Class<?>) {
- throw new InvalidTypesException("Parameterized Either type expected.");
- }
-
- EitherTypeInfo<?, ?> eti = (EitherTypeInfo<?, ?>) typeInfo;
- Type[] subTypes = ((ParameterizedType) type).getActualTypeArguments();
- validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[0], eti.getLeftType());
- validateInfo(new ArrayList<Type>(typeHierarchy), subTypes[1], eti.getRightType());
- }
// check for primitive array
else if (typeInfo instanceof PrimitiveArrayTypeInfo) {
Type component;
@@ -1675,11 +1618,6 @@ public class TypeExtractor {
throw new InvalidTypesException("Type information extraction for tuples (except Tuple0) cannot be done based on the class.");
}
- // check for subclasses of Either
- if (Either.class.isAssignableFrom(clazz)) {
- throw new InvalidTypesException("Type information extraction for Either cannot be done based on the class.");
- }
-
// check for Enums
if(Enum.class.isAssignableFrom(clazz)) {
return new EnumTypeInfo(clazz);
@@ -1956,18 +1894,6 @@ public class TypeExtractor {
}
return new TupleTypeInfo(value.getClass(), infos);
}
- // we can not extract the types from an Either object since it only contains type information
- // of one type, but from Either classes
- else if (value instanceof Either) {
- try {
- return (TypeInformation<X>) privateCreateTypeInfo(value.getClass());
- }
- catch (InvalidTypesException e) {
- throw new InvalidTypesException("Automatic type extraction is not possible on an Either type "
- + "as it does not contain information about both possible types. "
- + "Please specify the types directly.");
- }
- }
else {
return privateGetForClass((Class<X>) value.getClass(), new ArrayList<Type>());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d4d7cc32/flink-core/src/main/java/org/apache/flink/types/Either.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Either.java b/flink-core/src/main/java/org/apache/flink/types/Either.java
index d61b228..a08e968 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Either.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Either.java
@@ -19,6 +19,8 @@
package org.apache.flink.types;
import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.typeinfo.TypeInfo;
+import org.apache.flink.api.java.typeutils.EitherTypeInfoFactory;
/**
* This type represents a value of one two possible types, Left or Right (a
@@ -30,6 +32,7 @@ import org.apache.flink.annotation.Public;
* the type of Right
*/
@Public
+@TypeInfo(EitherTypeInfoFactory.class)
public abstract class Either<L, R> {
/**