You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2015/09/17 14:04:05 UTC
[3/7] flink git commit: [FLINK-2637] [api-breaking] [types] Adds
equals and hashCode method to TypeInformations and TypeSerializers - Fixes
ObjectArrayTypeInfo - Makes CompositeTypes serializable - Adds test for
equality relation's symmetric property - A
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
index 024eb71..de59c36 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/EnumTypeInfo.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.typeutils;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -38,12 +39,12 @@ public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implemen
private final Class<T> typeClass;
public EnumTypeInfo(Class<T> typeClass) {
- if (typeClass == null) {
- throw new NullPointerException();
- }
+ Preconditions.checkNotNull(typeClass, "Enum type class must not be null.");
+
if (!Enum.class.isAssignableFrom(typeClass) ) {
throw new IllegalArgumentException("EnumTypeInfo can only be used for subclasses of " + Enum.class.getName());
}
+
this.typeClass = typeClass;
}
@@ -98,13 +99,22 @@ public class EnumTypeInfo<T extends Enum<T>> extends TypeInformation<T> implemen
@Override
public int hashCode() {
- return typeClass.hashCode() ^ 0xd3a2646c;
+ return typeClass.hashCode();
}
-
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof EnumTypeInfo;
+ }
+
@Override
public boolean equals(Object obj) {
if (obj instanceof EnumTypeInfo) {
- return typeClass == ((EnumTypeInfo<?>) obj).typeClass;
+ @SuppressWarnings("unchecked")
+ EnumTypeInfo<T> enumTypeInfo = (EnumTypeInfo<T>) obj;
+
+ return enumTypeInfo.canEqual(this) &&
+ typeClass == enumTypeInfo.typeClass;
} else {
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
index 5caf8f2..7e7aa68 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/GenericTypeInfo.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.typeutils;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.AtomicType;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -34,7 +35,7 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
private final Class<T> typeClass;
public GenericTypeInfo(Class<T> typeClass) {
- this.typeClass = typeClass;
+ this.typeClass = Preconditions.checkNotNull(typeClass);
}
@Override
@@ -88,13 +89,21 @@ public class GenericTypeInfo<T> extends TypeInformation<T> implements AtomicType
@Override
public int hashCode() {
- return typeClass.hashCode() ^ 0x165667b1;
+ return typeClass.hashCode();
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof GenericTypeInfo;
}
@Override
public boolean equals(Object obj) {
- if (obj.getClass() == GenericTypeInfo.class) {
- return typeClass == ((GenericTypeInfo<?>) obj).typeClass;
+ if (obj instanceof GenericTypeInfo) {
+ @SuppressWarnings("unchecked")
+ GenericTypeInfo<T> genericTypeInfo = (GenericTypeInfo<T>) obj;
+
+ return typeClass == genericTypeInfo.typeClass;
} else {
return false;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
index 227c68c..1dd7f01 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/MissingTypeInfo.java
@@ -31,8 +31,8 @@ public class MissingTypeInfo extends TypeInformation<InvalidTypesException> {
private static final long serialVersionUID = -4212082837126702723L;
- private String functionName;
- private InvalidTypesException typeException;
+ private final String functionName;
+ private final InvalidTypesException typeException;
public MissingTypeInfo(String functionName) {
@@ -87,6 +87,34 @@ public class MissingTypeInfo extends TypeInformation<InvalidTypesException> {
}
@Override
+ public String toString() {
+ return getClass().getSimpleName() + "<" + functionName + ", " + typeException.getMessage() + ">";
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof MissingTypeInfo) {
+ MissingTypeInfo missingTypeInfo = (MissingTypeInfo) obj;
+
+ return missingTypeInfo.canEqual(this) &&
+ functionName.equals(missingTypeInfo.functionName) &&
+ typeException.equals(missingTypeInfo.typeException);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * functionName.hashCode() + typeException.hashCode();
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof MissingTypeInfo;
+ }
+
+ @Override
public int getTotalFields() {
throw new UnsupportedOperationException("The missing type information cannot be used as a type information.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
index 6806122..150c976 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ObjectArrayTypeInfo.java
@@ -19,13 +19,9 @@
package org.apache.flink.api.java.typeutils;
import java.lang.reflect.Array;
-import java.lang.reflect.GenericArrayType;
-import java.lang.reflect.ParameterizedType;
-import java.lang.reflect.Type;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.InvalidTypesException;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.GenericArraySerializer;
@@ -34,21 +30,12 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
private static final long serialVersionUID = 1L;
- private final Type arrayType;
- private final Type componentType;
+ private final Class<T> arrayType;
private final TypeInformation<C> componentInfo;
- @SuppressWarnings("unchecked")
- private ObjectArrayTypeInfo(Type arrayType, Type componentType) {
- this.arrayType = arrayType;
- this.componentType = componentType;
- this.componentInfo = (TypeInformation<C>) TypeExtractor.createTypeInfo(componentType);
- }
-
- private ObjectArrayTypeInfo(Type arrayType, Type componentType, TypeInformation<C> componentInfo) {
- this.arrayType = arrayType;
- this.componentType = componentType;
- this.componentInfo = componentInfo;
+ private ObjectArrayTypeInfo(Class<T> arrayType, TypeInformation<C> componentInfo) {
+ this.arrayType = Preconditions.checkNotNull(arrayType);
+ this.componentInfo = Preconditions.checkNotNull(componentInfo);
}
// --------------------------------------------------------------------------------------------
@@ -76,29 +63,9 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
@SuppressWarnings("unchecked")
@Override
public Class<T> getTypeClass() {
- // if arrayType is a Class
- if (arrayType instanceof Class) {
- return (Class<T>) arrayType;
- }
-
- // if arrayType is a GenericArrayType
- Class<?> componentClass = (Class<?>) ((ParameterizedType) componentType).getRawType();
-
- try {
- return (Class<T>) Class.forName("[L" + componentClass.getName() + ";");
- } catch (ClassNotFoundException e) {
- throw new RuntimeException("Cannot create non-generic type class.", e);
- }
- }
-
- public Type getType() {
return arrayType;
}
- public Type getComponentType() {
- return this.componentType;
- }
-
public TypeInformation<C> getComponentInfo() {
return componentInfo;
}
@@ -111,15 +78,9 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
@SuppressWarnings("unchecked")
@Override
public TypeSerializer<T> createSerializer(ExecutionConfig executionConfig) {
- // use raw type for serializer if generic array type
- if (this.componentType instanceof GenericArrayType) {
- ParameterizedType paramType = (ParameterizedType) ((GenericArrayType) this.componentType).getGenericComponentType();
-
- return (TypeSerializer<T>) new GenericArraySerializer<C>((Class<C>) paramType.getRawType(),
- this.componentInfo.createSerializer(executionConfig));
- } else {
- return (TypeSerializer<T>) new GenericArraySerializer<C>((Class<C>) this.componentType, this.componentInfo.createSerializer(executionConfig));
- }
+ return (TypeSerializer<T>) new GenericArraySerializer<C>(
+ componentInfo.getTypeClass(),
+ componentInfo.createSerializer(executionConfig));
}
@Override
@@ -128,38 +89,37 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
}
@Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
+ public boolean equals(Object obj) {
+ if (obj instanceof ObjectArrayTypeInfo) {
+ @SuppressWarnings("unchecked")
+ ObjectArrayTypeInfo<T, C> objectArrayTypeInfo = (ObjectArrayTypeInfo<T, C>)obj;
+
+ return objectArrayTypeInfo.canEqual(this) &&
+ arrayType == objectArrayTypeInfo.arrayType &&
+ componentInfo.equals(objectArrayTypeInfo.componentInfo);
+ } else {
return false;
}
+ }
- ObjectArrayTypeInfo<?, ?> that = (ObjectArrayTypeInfo<?, ?>) o;
- return this.arrayType.equals(that.arrayType) && this.componentType.equals(that.componentType);
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof ObjectArrayTypeInfo;
}
@Override
public int hashCode() {
- return 31 * this.arrayType.hashCode() + this.componentType.hashCode();
+ return 31 * this.arrayType.hashCode() + this.componentInfo.hashCode();
}
// --------------------------------------------------------------------------------------------
- public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Type type, TypeInformation<C> componentInfo) {
- // generic array type e.g. for Tuples
- if (type instanceof GenericArrayType) {
- GenericArrayType genericArray = (GenericArrayType) type;
- return new ObjectArrayTypeInfo<T, C>(type, genericArray.getGenericComponentType(), componentInfo);
- }
- // for tuples without generics (e.g. generated by the TypeInformation parser)
- // and multidimensional arrays (e.g. in scala)
- else if (type instanceof Class<?> && ((Class<?>) type).isArray()
- && BasicTypeInfo.getInfoFor((Class<?>) type) == null) {
- return new ObjectArrayTypeInfo<T, C>(type, ((Class<?>) type).getComponentType(), componentInfo);
- }
- throw new InvalidTypesException("The given type is not a valid object array.");
+ public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Class<T> arrayClass, TypeInformation<C> componentInfo) {
+ Preconditions.checkNotNull(arrayClass);
+ Preconditions.checkNotNull(componentInfo);
+ Preconditions.checkArgument(arrayClass.isArray(), "Class " + arrayClass + " must be an array.");
+
+ return new ObjectArrayTypeInfo<T, C>(arrayClass, componentInfo);
}
/**
@@ -170,20 +130,12 @@ public class ObjectArrayTypeInfo<T, C> extends TypeInformation<T> {
* This must be used in cases where the complete type of the array is not available as a
* {@link java.lang.reflect.Type} or {@link java.lang.Class}.
*/
+ @SuppressWarnings("unchecked")
public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(TypeInformation<C> componentInfo) {
- return new ObjectArrayTypeInfo<T, C>(
- Array.newInstance(componentInfo.getTypeClass(), 0).getClass(),
- componentInfo.getTypeClass(),
- componentInfo);
- }
+ Preconditions.checkNotNull(componentInfo);
- @SuppressWarnings("unchecked")
- public static <T, C> ObjectArrayTypeInfo<T, C> getInfoFor(Type type) {
- // class type e.g. for POJOs
- if (type instanceof Class<?> && ((Class<?>) type).isArray() && BasicTypeInfo.getInfoFor((Class<C>) type) == null) {
- Class<C> array = (Class<C>) type;
- return new ObjectArrayTypeInfo<T, C>(type, array.getComponentType());
- }
- throw new InvalidTypesException("The given type is not a valid object array.");
+ return new ObjectArrayTypeInfo<T, C>(
+ (Class<T>)Array.newInstance(componentInfo.getTypeClass(), 0).getClass(),
+ componentInfo);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
index 91a7a5e..4ad0ac2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoField.java
@@ -23,16 +23,29 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
+import java.util.Objects;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeinfo.TypeInformation;
public class PojoField implements Serializable {
- public transient Field field;
- public TypeInformation<?> type;
+
+ private static final long serialVersionUID = 1975295846436559363L;
+
+ private transient Field field;
+ private final TypeInformation<?> type;
public PojoField(Field field, TypeInformation<?> type) {
- this.field = field;
- this.type = type;
+ this.field = Preconditions.checkNotNull(field);
+ this.type = Preconditions.checkNotNull(type);
+ }
+
+ public Field getField() {
+ return field;
+ }
+
+ public TypeInformation<?> getTypeInformation() {
+ return type;
}
private void writeObject(ObjectOutputStream out)
@@ -68,4 +81,25 @@ public class PojoField implements Serializable {
public String toString() {
return "PojoField " + field.getDeclaringClass() + "." + field.getName() + " (" + type + ")";
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof PojoField) {
+ PojoField other = (PojoField) obj;
+
+ return other.canEqual(this) && type.equals(other.type) &&
+ Objects.equals(field, other.field);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(field, type);
+ }
+
+ public boolean canEqual(Object obj) {
+ return obj instanceof PojoField;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
index 273a907..f7e4e42 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java
@@ -22,12 +22,12 @@ import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
@@ -40,8 +40,6 @@ import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
import com.google.common.base.Joiner;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* TypeInformation for "Java Beans"-style types. Flink refers to them as POJOs,
@@ -59,8 +57,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
private static final long serialVersionUID = 1L;
- private static final Logger LOG = LoggerFactory.getLogger(PojoTypeInfo.class);
-
private final static String REGEX_FIELD = "[\\p{L}_\\$][\\p{L}\\p{Digit}_\\$]*";
private final static String REGEX_NESTED_FIELDS = "("+REGEX_FIELD+")(\\.(.+))?";
private final static String REGEX_NESTED_FIELDS_WILDCARD = REGEX_NESTED_FIELDS
@@ -70,31 +66,32 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
- private final Class<T> typeClass;
-
- private PojoField[] fields;
+ private final PojoField[] fields;
- private int totalFields;
+ private final int totalFields;
public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields) {
super(typeClass);
- this.typeClass = typeClass;
- List<PojoField> tempFields = new ArrayList<PojoField>(fields);
- Collections.sort(tempFields, new Comparator<PojoField>() {
+
+ Preconditions.checkArgument(Modifier.isPublic(typeClass.getModifiers()),
+ "POJO " + typeClass + " is not public");
+
+ this.fields = fields.toArray(new PojoField[fields.size()]);
+
+ Arrays.sort(this.fields, new Comparator<PojoField>() {
@Override
public int compare(PojoField o1, PojoField o2) {
- return o1.field.getName().compareTo(o2.field.getName());
+ return o1.getField().getName().compareTo(o2.getField().getName());
}
});
- this.fields = tempFields.toArray(new PojoField[tempFields.size()]);
-
- // check if POJO is public
- if(!Modifier.isPublic(typeClass.getModifiers())) {
- throw new RuntimeException("POJO "+typeClass+" is not public");
- }
+
+ int counterFields = 0;
+
for(PojoField field : fields) {
- totalFields += field.type.getTotalFields();
+ counterFields += field.getTypeInformation().getTotalFields();
}
+
+ totalFields = counterFields;
}
@Override
@@ -119,11 +116,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
}
@Override
- public Class<T> getTypeClass() {
- return typeClass;
- }
-
- @Override
public boolean isSortKeyType() {
// Support for sorting POJOs that implement Comparable is not implemented yet.
// Since the order of fields in a POJO type is not well defined, sorting on fields
@@ -145,12 +137,16 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
// handle select all
int keyPosition = 0;
for(PojoField pField : fields) {
- if(pField.type instanceof CompositeType) {
- CompositeType<?> cType = (CompositeType<?>)pField.type;
+ if(pField.getTypeInformation() instanceof CompositeType) {
+ CompositeType<?> cType = (CompositeType<?>)pField.getTypeInformation();
cType.getFlatFields(String.valueOf(ExpressionKeys.SELECT_ALL_CHAR), offset + keyPosition, result);
keyPosition += cType.getTotalFields()-1;
} else {
- result.add(new NamedFlatFieldDescriptor(pField.field.getName(), offset + keyPosition, pField.type));
+ result.add(
+ new NamedFlatFieldDescriptor(
+ pField.getField().getName(),
+ offset + keyPosition,
+ pField.getTypeInformation()));
}
keyPosition++;
}
@@ -163,9 +159,9 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
int fieldPos = -1;
TypeInformation<?> fieldType = null;
for (int i = 0; i < fields.length; i++) {
- if (fields[i].field.getName().equals(field)) {
+ if (fields[i].getField().getName().equals(field)) {
fieldPos = i;
- fieldType = fields[i].type;
+ fieldType = fields[i].getTypeInformation();
break;
}
}
@@ -181,7 +177,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
}
// add all fields of composite type
((CompositeType<?>) fieldType).getFlatFields("*", offset, result);
- return;
} else {
// we found the field to add
// compute flat field position by adding skipped fields
@@ -190,8 +185,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
flatFieldPos += this.getTypeAt(i).getTotalFields();
}
result.add(new FlatFieldDescriptor(flatFieldPos, fieldType));
- // nothing left to do
- return;
}
} else {
if(fieldType instanceof CompositeType<?>) {
@@ -200,8 +193,6 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
offset += this.getTypeAt(i).getTotalFields();
}
((CompositeType<?>) fieldType).getFlatFields(tail, offset, result);
- // nothing left to do
- return;
} else {
throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+".");
}
@@ -226,9 +217,9 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
int fieldPos = -1;
TypeInformation<?> fieldType = null;
for (int i = 0; i < fields.length; i++) {
- if (fields[i].field.getName().equals(field)) {
+ if (fields[i].getField().getName().equals(field)) {
fieldPos = i;
- fieldType = fields[i].type;
+ fieldType = fields[i].getTypeInformation();
break;
}
}
@@ -255,10 +246,15 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
throw new IndexOutOfBoundsException();
}
@SuppressWarnings("unchecked")
- TypeInformation<X> typed = (TypeInformation<X>) fields[pos].type;
+ TypeInformation<X> typed = (TypeInformation<X>) fields[pos].getTypeInformation();
return typed;
}
+ @Override
+ protected TypeComparatorBuilder<T> createTypeComparatorBuilder() {
+ return new PojoTypeComparatorBuilder();
+ }
+
// used for testing. Maybe use mockito here
public PojoField getPojoFieldAt(int pos) {
if (pos < 0 || pos >= this.fields.length) {
@@ -267,42 +263,10 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
return this.fields[pos];
}
- /**
- * Comparator creation
- */
- private TypeComparator<?>[] fieldComparators;
- private Field[] keyFields;
- private int comparatorHelperIndex = 0;
- @Override
- protected void initializeNewComparator(int keyCount) {
- fieldComparators = new TypeComparator<?>[keyCount];
- keyFields = new Field[keyCount];
- comparatorHelperIndex = 0;
- }
-
- @Override
- protected void addCompareField(int fieldId, TypeComparator<?> comparator) {
- fieldComparators[comparatorHelperIndex] = comparator;
- keyFields[comparatorHelperIndex] = fields[fieldId].field;
- comparatorHelperIndex++;
- }
-
- @Override
- protected TypeComparator<T> getNewComparator(ExecutionConfig config) {
- // first remove the null array fields
- final Field[] finalKeyFields = Arrays.copyOf(keyFields, comparatorHelperIndex);
- @SuppressWarnings("rawtypes")
- final TypeComparator[] finalFieldComparators = Arrays.copyOf(fieldComparators, comparatorHelperIndex);
- if(finalFieldComparators.length == 0 || finalKeyFields.length == 0 || finalFieldComparators.length != finalKeyFields.length) {
- throw new IllegalArgumentException("Pojo comparator creation has a bug");
- }
- return new PojoComparator<T>(finalKeyFields, finalFieldComparators, createSerializer(config), typeClass);
- }
-
public String[] getFieldNames() {
String[] result = new String[fields.length];
for (int i = 0; i < fields.length; i++) {
- result[i] = fields[i].field.getName();
+ result[i] = fields[i].getField().getName();
}
return result;
}
@@ -310,7 +274,7 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
@Override
public int getFieldIndex(String fieldName) {
for (int i = 0; i < fields.length; i++) {
- if (fields[i].field.getName().equals(fieldName)) {
+ if (fields[i].getField().getName().equals(fieldName)) {
return i;
}
}
@@ -320,46 +284,106 @@ public class PojoTypeInfo<T> extends CompositeType<T> {
@Override
public TypeSerializer<T> createSerializer(ExecutionConfig config) {
if(config.isForceKryoEnabled()) {
- return new KryoSerializer<T>(this.typeClass, config);
+ return new KryoSerializer<T>(getTypeClass(), config);
}
if(config.isForceAvroEnabled()) {
- return new AvroSerializer<T>(this.typeClass);
+ return new AvroSerializer<T>(getTypeClass());
}
TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length ];
Field[] reflectiveFields = new Field[fields.length];
for (int i = 0; i < fields.length; i++) {
- fieldSerializers[i] = fields[i].type.createSerializer(config);
- reflectiveFields[i] = fields[i].field;
+ fieldSerializers[i] = fields[i].getTypeInformation().createSerializer(config);
+ reflectiveFields[i] = fields[i].getField();
}
- return new PojoSerializer<T>(this.typeClass, fieldSerializers, reflectiveFields, config);
+ return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
}
-
- // --------------------------------------------------------------------------------------------
@Override
public boolean equals(Object obj) {
- return (obj instanceof PojoTypeInfo) && ((PojoTypeInfo<?>) obj).typeClass == this.typeClass;
+ if (obj instanceof PojoTypeInfo) {
+ @SuppressWarnings("unchecked")
+ PojoTypeInfo<T> pojoTypeInfo = (PojoTypeInfo<T>)obj;
+
+ return pojoTypeInfo.canEqual(this) &&
+ super.equals(pojoTypeInfo) &&
+ Arrays.equals(fields, pojoTypeInfo.fields) &&
+ totalFields == pojoTypeInfo.totalFields;
+ } else {
+ return false;
+ }
}
@Override
public int hashCode() {
- return typeClass.hashCode() + 1387562934;
+ return 31 * (31 * Arrays.hashCode(fields) + totalFields) + super.hashCode();
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof PojoTypeInfo;
}
@Override
public String toString() {
List<String> fieldStrings = new ArrayList<String>();
for (PojoField field : fields) {
- fieldStrings.add(field.field.getName() + ": " + field.type.toString());
+ fieldStrings.add(field.getField().getName() + ": " + field.getTypeInformation().toString());
}
- return "PojoType<" + typeClass.getName()
+ return "PojoType<" + getTypeClass().getName()
+ ", fields = [" + Joiner.on(", ").join(fieldStrings) + "]"
+ ">";
}
+ // --------------------------------------------------------------------------------------------
+
+ private class PojoTypeComparatorBuilder implements TypeComparatorBuilder<T> {
+
+ private ArrayList<TypeComparator> fieldComparators;
+ private ArrayList<Field> keyFields;
+
+ public PojoTypeComparatorBuilder() {
+ fieldComparators = new ArrayList<TypeComparator>();
+ keyFields = new ArrayList<Field>();
+ }
+
+
+ @Override
+ public void initializeTypeComparatorBuilder(int size) {
+ fieldComparators.ensureCapacity(size);
+ keyFields.ensureCapacity(size);
+ }
+
+ @Override
+ public void addComparatorField(int fieldId, TypeComparator<?> comparator) {
+ fieldComparators.add(comparator);
+ keyFields.add(fields[fieldId].getField());
+ }
+
+ @Override
+ public TypeComparator<T> createTypeComparator(ExecutionConfig config) {
+ Preconditions.checkState(
+ keyFields.size() > 0,
+ "No keys were defined for the PojoTypeComparatorBuilder.");
+
+ Preconditions.checkState(
+ fieldComparators.size() > 0,
+ "No type comparators were defined for the PojoTypeComparatorBuilder.");
+
+ Preconditions.checkState(
+ keyFields.size() == fieldComparators.size(),
+ "Number of key fields and field comparators is not equal.");
+
+ return new PojoComparator<T>(
+ keyFields.toArray(new Field[keyFields.size()]),
+ fieldComparators.toArray(new TypeComparator[fieldComparators.size()]),
+ createSerializer(config),
+ getTypeClass());
+ }
+ }
+
public static class NamedFlatFieldDescriptor extends FlatFieldDescriptor {
private String fieldName;
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
index 9857eb6..e9ce102 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/RecordTypeInfo.java
@@ -70,12 +70,22 @@ public class RecordTypeInfo extends TypeInformation<Record> {
@Override
public int hashCode() {
- return Record.class.hashCode() ^ 0x165667b1;
+ return Record.class.hashCode();
}
-
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof RecordTypeInfo;
+ }
+
@Override
public boolean equals(Object obj) {
- return obj.getClass() == RecordTypeInfo.class;
+ if (obj instanceof RecordTypeInfo) {
+ RecordTypeInfo recordTypeInfo = (RecordTypeInfo) obj;
+ return recordTypeInfo.canEqual(this);
+ } else {
+ return false;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
index 618b190..30710e5 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
@@ -18,8 +18,12 @@
package org.apache.flink.api.java.typeutils;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
+import com.google.common.base.Preconditions;
+import com.google.common.primitives.Ints;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -52,10 +56,13 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
public TupleTypeInfo(Class<T> tupleType, TypeInformation<?>... types) {
super(tupleType, types);
- if (types == null || types.length > Tuple.MAX_ARITY) {
- throw new IllegalArgumentException();
- }
+
+ Preconditions.checkArgument(
+ types.length <= Tuple.MAX_ARITY,
+ "The tuple type exceeds the maximum supported arity.");
+
this.fieldNames = new String[types.length];
+
for (int i = 0; i < types.length; i++) {
fieldNames[i] = "f" + i;
}
@@ -78,7 +85,7 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
@SuppressWarnings("unchecked")
@Override
public TupleSerializer<T> createSerializer(ExecutionConfig executionConfig) {
- if (this.tupleType == Tuple0.class) {
+ if (getTypeClass() == Tuple0.class) {
return (TupleSerializer<T>) Tuple0Serializer.INSTANCE;
}
@@ -91,48 +98,65 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
return new TupleSerializer<T>(tupleClass, fieldSerializers);
}
-
- /**
- * Comparator creation
- */
- private TypeComparator<?>[] fieldComparators;
- private int[] logicalKeyFields;
- private int comparatorHelperIndex = 0;
-
- @Override
- protected void initializeNewComparator(int localKeyCount) {
- fieldComparators = new TypeComparator<?>[localKeyCount];
- logicalKeyFields = new int[localKeyCount];
- comparatorHelperIndex = 0;
- }
@Override
- protected void addCompareField(int fieldId, TypeComparator<?> comparator) {
- fieldComparators[comparatorHelperIndex] = comparator;
- logicalKeyFields[comparatorHelperIndex] = fieldId;
- comparatorHelperIndex++;
+ protected TypeComparatorBuilder<T> createTypeComparatorBuilder() {
+ return new TupleTypeComparatorBuilder();
}
- @Override
- protected TypeComparator<T> getNewComparator(ExecutionConfig executionConfig) {
- @SuppressWarnings("rawtypes")
- final TypeComparator[] finalFieldComparators = Arrays.copyOf(fieldComparators, comparatorHelperIndex);
- final int[] finalLogicalKeyFields = Arrays.copyOf(logicalKeyFields, comparatorHelperIndex);
- //final TypeSerializer[] finalFieldSerializers = Arrays.copyOf(fieldSerializers, comparatorHelperIndex);
- // create the serializers for the prefix up to highest key position
- int maxKey = 0;
- for(int key : finalLogicalKeyFields) {
- maxKey = Math.max(maxKey, key);
+ private class TupleTypeComparatorBuilder implements TypeComparatorBuilder<T> {
+
+ private final ArrayList<TypeComparator> fieldComparators = new ArrayList<TypeComparator>();
+ private final ArrayList<Integer> logicalKeyFields = new ArrayList<Integer>();
+
+ @Override
+ public void initializeTypeComparatorBuilder(int size) {
+ fieldComparators.ensureCapacity(size);
+ logicalKeyFields.ensureCapacity(size);
}
- TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1];
- for (int i = 0; i <= maxKey; i++) {
- fieldSerializers[i] = types[i].createSerializer(executionConfig);
+
+ @Override
+ public void addComparatorField(int fieldId, TypeComparator<?> comparator) {
+ fieldComparators.add(comparator);
+ logicalKeyFields.add(fieldId);
}
- if(finalFieldComparators.length == 0 || finalLogicalKeyFields.length == 0 || fieldSerializers.length == 0
- || finalFieldComparators.length != finalLogicalKeyFields.length) {
- throw new IllegalArgumentException("Tuple comparator creation has a bug");
+
+ @Override
+ public TypeComparator<T> createTypeComparator(ExecutionConfig config) {
+ Preconditions.checkState(
+ fieldComparators.size() > 0,
+ "No field comparators were defined for the TupleTypeComparatorBuilder."
+ );
+
+ Preconditions.checkState(
+ logicalKeyFields.size() > 0,
+ "No key fields were defined for the TupleTypeComparatorBuilder."
+ );
+
+ Preconditions.checkState(
+ fieldComparators.size() == logicalKeyFields.size(),
+ "The number of field comparators and key fields is not equal."
+ );
+
+ final int maxKey = Collections.max(logicalKeyFields);
+
+ Preconditions.checkState(
+ maxKey >= 0,
+ "The maximum key field must be greater or equal than 0."
+ );
+
+ TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[maxKey + 1];
+
+ for (int i = 0; i <= maxKey; i++) {
+ fieldSerializers[i] = types[i].createSerializer(config);
+ }
+
+ return new TupleComparator<T>(
+ Ints.toArray(logicalKeyFields),
+ fieldComparators.toArray(new TypeComparator[fieldComparators.size()]),
+ fieldSerializers
+ );
}
- return new TupleComparator<T>(finalLogicalKeyFields, finalFieldComparators, fieldSerializers);
}
// --------------------------------------------------------------------------------------------
@@ -142,17 +166,22 @@ public final class TupleTypeInfo<T extends Tuple> extends TupleTypeInfoBase<T> {
if (obj instanceof TupleTypeInfo) {
@SuppressWarnings("unchecked")
TupleTypeInfo<T> other = (TupleTypeInfo<T>) obj;
- return ((this.tupleType == null && other.tupleType == null) || this.tupleType.equals(other.tupleType)) &&
- Arrays.deepEquals(this.types, other.types);
-
+ return other.canEqual(this) &&
+ super.equals(other) &&
+ Arrays.equals(fieldNames, other.fieldNames);
} else {
return false;
}
}
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof TupleTypeInfo;
+ }
@Override
public int hashCode() {
- return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
+ return 31 * super.hashCode() + Arrays.hashCode(fieldNames);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
index a2d937f..469476f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfoBase.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.operators.Keys.ExpressionKeys;
@@ -45,17 +46,20 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
protected final TypeInformation<?>[] types;
- protected final Class<T> tupleType;
-
- private int totalFields;
+ private final int totalFields;
public TupleTypeInfoBase(Class<T> tupleType, TypeInformation<?>... types) {
super(tupleType);
- this.tupleType = tupleType;
- this.types = types;
+
+ this.types = Preconditions.checkNotNull(types);
+
+ int fieldCounter = 0;
+
for(TypeInformation<?> type : types) {
- totalFields += type.getTotalFields();
+ fieldCounter += type.getTotalFields();
}
+
+ totalFields = fieldCounter;
}
@Override
@@ -83,11 +87,6 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
}
@Override
- public Class<T> getTypeClass() {
- return tupleType;
- }
-
- @Override
public void getFlatFields(String fieldExpression, int offset, List<FlatFieldDescriptor> result) {
Matcher matcher = PATTERN_NESTED_FIELDS_WILDCARD.matcher(fieldExpression);
@@ -109,53 +108,49 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
}
keyPosition++;
}
- return;
- }
+ } else {
+ String fieldStr = matcher.group(1);
+ Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr);
- String fieldStr = matcher.group(1);
- Matcher fieldMatcher = PATTERN_FIELD.matcher(fieldStr);
- if (!fieldMatcher.matches()) {
- throw new RuntimeException("Invalid matcher pattern");
- }
- field = fieldMatcher.group(2);
- int fieldPos = Integer.valueOf(field);
+ if (!fieldMatcher.matches()) {
+ throw new RuntimeException("Invalid matcher pattern");
+ }
- if (fieldPos >= this.getArity()) {
- throw new InvalidFieldReferenceException("Tuple field expression \"" + fieldStr + "\" out of bounds of " + this.toString() + ".");
- }
- TypeInformation<?> fieldType = this.getTypeAt(fieldPos);
- String tail = matcher.group(5);
- if(tail == null) {
- if(fieldType instanceof CompositeType) {
- // forward offsets
- for(int i=0; i<fieldPos; i++) {
- offset += this.getTypeAt(i).getTotalFields();
- }
- // add all fields of composite type
- ((CompositeType<?>) fieldType).getFlatFields("*", offset, result);
- return;
- } else {
- // we found the field to add
- // compute flat field position by adding skipped fields
- int flatFieldPos = offset;
- for(int i=0; i<fieldPos; i++) {
- flatFieldPos += this.getTypeAt(i).getTotalFields();
- }
- result.add(new FlatFieldDescriptor(flatFieldPos, fieldType));
- // nothing left to do
- return;
+ field = fieldMatcher.group(2);
+ int fieldPos = Integer.valueOf(field);
+
+ if (fieldPos >= this.getArity()) {
+ throw new InvalidFieldReferenceException("Tuple field expression \"" + fieldStr + "\" out of bounds of " + this.toString() + ".");
}
- } else {
- if(fieldType instanceof CompositeType<?>) {
- // forward offset
- for(int i=0; i<fieldPos; i++) {
- offset += this.getTypeAt(i).getTotalFields();
+ TypeInformation<?> fieldType = this.getTypeAt(fieldPos);
+ String tail = matcher.group(5);
+ if (tail == null) {
+ if (fieldType instanceof CompositeType) {
+ // forward offsets
+ for (int i = 0; i < fieldPos; i++) {
+ offset += this.getTypeAt(i).getTotalFields();
+ }
+ // add all fields of composite type
+ ((CompositeType<?>) fieldType).getFlatFields("*", offset, result);
+ } else {
+ // we found the field to add
+ // compute flat field position by adding skipped fields
+ int flatFieldPos = offset;
+ for (int i = 0; i < fieldPos; i++) {
+ flatFieldPos += this.getTypeAt(i).getTotalFields();
+ }
+ result.add(new FlatFieldDescriptor(flatFieldPos, fieldType));
}
- ((CompositeType<?>) fieldType).getFlatFields(tail, offset, result);
- // nothing left to do
- return;
} else {
- throw new InvalidFieldReferenceException("Nested field expression \""+tail+"\" not possible on atomic type "+fieldType+".");
+ if (fieldType instanceof CompositeType<?>) {
+ // forward offset
+ for (int i = 0; i < fieldPos; i++) {
+ offset += this.getTypeAt(i).getTotalFields();
+ }
+ ((CompositeType<?>) fieldType).getFlatFields(tail, offset, result);
+ } else {
+ throw new InvalidFieldReferenceException("Nested field expression \"" + tail + "\" not possible on atomic type " + fieldType + ".");
+ }
}
}
}
@@ -213,17 +208,24 @@ public abstract class TupleTypeInfoBase<T> extends CompositeType<T> {
if (obj instanceof TupleTypeInfoBase) {
@SuppressWarnings("unchecked")
TupleTypeInfoBase<T> other = (TupleTypeInfoBase<T>) obj;
- return ((this.tupleType == null && other.tupleType == null) || this.tupleType.equals(other.tupleType)) &&
- Arrays.deepEquals(this.types, other.types);
-
+
+ return other.canEqual(this) &&
+ super.equals(other) &&
+ Arrays.equals(types, other.types) &&
+ totalFields == other.totalFields;
} else {
return false;
}
}
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof TupleTypeInfoBase;
+ }
@Override
public int hashCode() {
- return this.types.hashCode() ^ Arrays.deepHashCode(this.types);
+ return 31 * (31 * super.hashCode() + Arrays.hashCode(types)) + totalFields;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
index 8f5d599..0196b5d 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
@@ -526,12 +526,33 @@ public class TypeExtractor {
} catch (ClassNotFoundException e) {
throw new InvalidTypesException("Could not convert GenericArrayType to Class.");
}
+
return getForClass(classArray);
+ } else {
+ TypeInformation<?> componentInfo = createTypeInfoWithTypeHierarchy(
+ typeHierarchy,
+ genericArray.getGenericComponentType(),
+ in1Type,
+ in2Type);
+
+ Class<OUT> classArray;
+
+ try {
+ String componentClassName = componentInfo.getTypeClass().getName();
+ String resultingClassName;
+
+ if (componentClassName.startsWith("[")) {
+ resultingClassName = "[" + componentClassName;
+ } else {
+ resultingClassName = "[L" + componentClassName + ";";
+ }
+ classArray = (Class<OUT>) Class.forName(resultingClassName);
+ } catch (ClassNotFoundException e) {
+ throw new InvalidTypesException("Could not convert GenericArrayType to Class.");
+ }
+
+ return ObjectArrayTypeInfo.getInfoFor(classArray, componentInfo);
}
-
- TypeInformation<?> componentInfo = createTypeInfoWithTypeHierarchy(typeHierarchy, genericArray.getGenericComponentType(),
- in1Type, in2Type);
- return ObjectArrayTypeInfo.getInfoFor(t, componentInfo);
}
// objects with generics are treated as Class first
else if (t instanceof ParameterizedType) {
@@ -1188,7 +1209,13 @@ public class TypeExtractor {
// object arrays
else {
- return ObjectArrayTypeInfo.getInfoFor(clazz);
+ TypeInformation<?> componentTypeInfo = createTypeInfoWithTypeHierarchy(
+ typeHierarchy,
+ clazz.getComponentType(),
+ in1Type,
+ in2Type);
+
+ return ObjectArrayTypeInfo.getInfoFor(clazz, componentTypeInfo);
}
}
@@ -1481,8 +1508,8 @@ public class TypeExtractor {
private static TypeInformation<?> getTypeOfPojoField(TypeInformation<?> pojoInfo, Field field) {
for (int j = 0; j < pojoInfo.getArity(); j++) {
PojoField pf = ((PojoTypeInfo<?>) pojoInfo).getPojoFieldAt(j);
- if (pf.field.getName().equals(field.getName())) {
- return pf.type;
+ if (pf.getField().getName().equals(field.getName())) {
+ return pf.getTypeInformation();
}
}
return null;
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
index e61acd8..0b4823e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/ValueTypeInfo.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.typeutils;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -52,17 +53,13 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
private static final long serialVersionUID = 1L;
private final Class<T> type;
-
public ValueTypeInfo(Class<T> type) {
- if (type == null) {
- throw new NullPointerException();
- }
- if (!Value.class.isAssignableFrom(type) && !type.equals(Value.class)) {
- throw new IllegalArgumentException("ValueTypeInfo can only be used for subclasses of " + Value.class.getName());
- }
-
- this.type = type;
+ this.type = Preconditions.checkNotNull(type);
+
+ Preconditions.checkArgument(
+ Value.class.isAssignableFrom(type) || type.equals(Value.class),
+ "ValueTypeInfo can only be used for subclasses of " + Value.class.getName());
}
@Override
@@ -136,17 +133,26 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
@Override
public int hashCode() {
- return this.type.hashCode() ^ 0xd3a2646c;
+ return this.type.hashCode();
}
@Override
public boolean equals(Object obj) {
- if (obj.getClass() == ValueTypeInfo.class) {
- return type == ((ValueTypeInfo<?>) obj).type;
+ if (obj instanceof ValueTypeInfo) {
+ @SuppressWarnings("unchecked")
+ ValueTypeInfo<T> valueTypeInfo = (ValueTypeInfo<T>) obj;
+
+ return valueTypeInfo.canEqual(this) &&
+ type == valueTypeInfo.type;
} else {
return false;
}
}
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof ValueTypeInfo;
+ }
@Override
public String toString() {
@@ -155,7 +161,7 @@ public class ValueTypeInfo<T extends Value> extends TypeInformation<T> implement
// --------------------------------------------------------------------------------------------
- static final <X extends Value> TypeInformation<X> getValueTypeInfo(Class<X> typeClass) {
+ static <X extends Value> TypeInformation<X> getValueTypeInfo(Class<X> typeClass) {
if (Value.class.isAssignableFrom(typeClass) && !typeClass.equals(Value.class)) {
return new ValueTypeInfo<X>(typeClass);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
index a91b888..6c140d9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/WritableTypeInfo.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.typeutils;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.AtomicType;
@@ -41,13 +42,11 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
private final Class<T> typeClass;
public WritableTypeInfo(Class<T> typeClass) {
- if (typeClass == null) {
- throw new NullPointerException();
- }
- if (!Writable.class.isAssignableFrom(typeClass) || typeClass == Writable.class) {
- throw new IllegalArgumentException("WritableTypeInfo can only be used for subclasses of " + Writable.class.getName());
- }
- this.typeClass = typeClass;
+ this.typeClass = Preconditions.checkNotNull(typeClass);
+
+ Preconditions.checkArgument(
+ Writable.class.isAssignableFrom(typeClass) && !typeClass.equals(Writable.class),
+ "WritableTypeInfo can only be used for subclasses of " + Writable.class.getName());
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -104,12 +103,26 @@ public class WritableTypeInfo<T extends Writable> extends TypeInformation<T> imp
@Override
public int hashCode() {
- return typeClass.hashCode() ^ 0xd3a2646c;
+ return typeClass.hashCode();
}
@Override
public boolean equals(Object obj) {
- return obj.getClass() == WritableTypeInfo.class && typeClass == ((WritableTypeInfo<?>) obj).typeClass;
+ if (obj instanceof WritableTypeInfo) {
+ @SuppressWarnings("unchecked")
+ WritableTypeInfo<T> writableTypeInfo = (WritableTypeInfo<T>) obj;
+
+ return writableTypeInfo.canEqual(this) &&
+ typeClass == writableTypeInfo.typeClass;
+
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof WritableTypeInfo;
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
index 34dc500..26bf4ce 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
@@ -69,14 +70,10 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
}
public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
- if (type == null || typeToInstantiate == null) {
- throw new NullPointerException();
- }
+ this.type = Preconditions.checkNotNull(type);
+ this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
InstantiationUtil.checkForInstantiation(typeToInstantiate);
-
- this.type = type;
- this.typeToInstantiate = typeToInstantiate;
}
// --------------------------------------------------------------------------------------------
@@ -192,16 +189,25 @@ public final class AvroSerializer<T> extends TypeSerializer<T> {
@Override
public int hashCode() {
- return 0x42fba55c + this.type.hashCode() + this.typeToInstantiate.hashCode();
+ return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
}
@Override
public boolean equals(Object obj) {
- if (obj.getClass() == AvroSerializer.class) {
- AvroSerializer<?> other = (AvroSerializer<?>) obj;
- return this.type == other.type && this.typeToInstantiate == other.typeToInstantiate;
+ if (obj instanceof AvroSerializer) {
+ @SuppressWarnings("unchecked")
+ AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;
+
+ return avroSerializer.canEqual(this) &&
+ type == avroSerializer.type &&
+ typeToInstantiate == avroSerializer.typeToInstantiate;
} else {
return false;
}
}
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof AvroSerializer;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
index 193d495..9e46f27 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -38,7 +39,7 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
public CopyableValueSerializer(Class<T> valueClass) {
- this.valueClass = valueClass;
+ this.valueClass = Preconditions.checkNotNull(valueClass);
}
@Override
@@ -105,16 +106,24 @@ public class CopyableValueSerializer<T extends CopyableValue<T>> extends TypeSer
@Override
public int hashCode() {
- return this.valueClass.hashCode() + 9231;
+ return this.valueClass.hashCode();
}
@Override
public boolean equals(Object obj) {
- if (obj.getClass() == CopyableValueSerializer.class) {
- CopyableValueSerializer<?> other = (CopyableValueSerializer<?>) obj;
- return this.valueClass == other.valueClass;
+ if (obj instanceof CopyableValueSerializer) {
+ @SuppressWarnings("unchecked")
+ CopyableValueSerializer<T> copyableValueSerializer = (CopyableValueSerializer<T>) obj;
+
+ return copyableValueSerializer.canEqual(this) &&
+ valueClass == copyableValueSerializer.valueClass;
} else {
return false;
}
}
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof CopyableValueSerializer;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
index 5d4553d..de24956 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java
@@ -30,7 +30,9 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -51,33 +53,33 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
private final Class<T> clazz;
- private TypeSerializer<Object>[] fieldSerializers;
+ private final TypeSerializer<Object>[] fieldSerializers;
- // We need to handle these ourselves in writeObject()/readObject()
- private transient Field[] fields;
-
- private int numFields;
+ private final int numFields;
- private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache;
- private transient ClassLoader cl;
+ private final Map<Class<?>, Integer> registeredClasses;
- private Map<Class<?>, Integer> registeredClasses;
-
- private TypeSerializer<?>[] registeredSerializers;
+ private final TypeSerializer<?>[] registeredSerializers;
private final ExecutionConfig executionConfig;
+ private transient Map<Class<?>, TypeSerializer<?>> subclassSerializerCache;
+ private transient ClassLoader cl;
+ // We need to handle these ourselves in writeObject()/readObject()
+ private transient Field[] fields;
+
@SuppressWarnings("unchecked")
public PojoSerializer(
Class<T> clazz,
TypeSerializer<?>[] fieldSerializers,
Field[] fields,
ExecutionConfig executionConfig) {
- this.clazz = clazz;
- this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
- this.fields = fields;
+
+ this.clazz = Preconditions.checkNotNull(clazz);
+ this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers);
+ this.fields = Preconditions.checkNotNull(fields);
this.numFields = fieldSerializers.length;
- this.executionConfig = executionConfig;
+ this.executionConfig = Preconditions.checkNotNull(executionConfig);
LinkedHashSet<Class<?>> registeredPojoTypes = executionConfig.getRegisteredPojoTypes();
@@ -563,23 +565,28 @@ public final class PojoSerializer<T> extends TypeSerializer<T> {
@Override
public int hashCode() {
- int hashCode = numFields * 47;
- for (TypeSerializer<?> ser : this.fieldSerializers) {
- hashCode = (hashCode << 7) | (hashCode >>> -7);
- hashCode += ser.hashCode();
- }
- return hashCode;
+ return 31 * (31 * Arrays.hashCode(fieldSerializers) + Arrays.hashCode(registeredSerializers)) +
+ Objects.hash(clazz, numFields, registeredClasses);
}
@Override
public boolean equals(Object obj) {
- if (obj != null && obj instanceof PojoSerializer) {
- PojoSerializer<?> otherTS = (PojoSerializer<?>) obj;
- return (otherTS.clazz == this.clazz) &&
- Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
- }
- else {
+ if (obj instanceof PojoSerializer) {
+ PojoSerializer<?> other = (PojoSerializer<?>) obj;
+
+ return other.canEqual(this) &&
+ clazz == other.clazz &&
+ Arrays.equals(fieldSerializers, other.fieldSerializers) &&
+ Arrays.equals(registeredSerializers, other.registeredSerializers) &&
+ numFields == other.numFields &&
+ registeredClasses.equals(other.registeredClasses);
+ } else {
return false;
}
}
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof PojoSerializer;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
index 246cecf..a06ff1a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/Tuple0Serializer.java
@@ -13,6 +13,7 @@
package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
+
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.core.memory.DataInputView;
@@ -94,12 +95,23 @@ public class Tuple0Serializer extends TupleSerializer<Tuple0> {
@Override
public int hashCode() {
- return 1837461876;
+ return Tuple0Serializer.class.hashCode();
}
@Override
public boolean equals(Object obj) {
- return obj == this || obj instanceof Tuple0Serializer;
+ if (obj instanceof Tuple0Serializer) {
+ Tuple0Serializer other = (Tuple0Serializer) obj;
+
+ return other.canEqual(this);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof Tuple0Serializer;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index f041736..bf3c7a1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -18,12 +18,14 @@
package org.apache.flink.api.java.typeutils.runtime;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Objects;
public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
@@ -32,14 +34,14 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
protected final Class<T> tupleClass;
- protected TypeSerializer<Object>[] fieldSerializers;
+ protected final TypeSerializer<Object>[] fieldSerializers;
protected final int arity;
@SuppressWarnings("unchecked")
public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers) {
- this.tupleClass = tupleClass;
- this.fieldSerializers = (TypeSerializer<Object>[]) fieldSerializers;
+ this.tupleClass = Preconditions.checkNotNull(tupleClass);
+ this.fieldSerializers = (TypeSerializer<Object>[]) Preconditions.checkNotNull(fieldSerializers);
this.arity = fieldSerializers.length;
}
@@ -74,23 +76,25 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T> {
@Override
public int hashCode() {
- int hashCode = arity * 47;
- for (TypeSerializer<?> ser : this.fieldSerializers) {
- hashCode = (hashCode << 7) | (hashCode >>> -7);
- hashCode += ser.hashCode();
- }
- return hashCode;
+ return 31 * Arrays.hashCode(fieldSerializers) + Objects.hash(tupleClass, arity);
}
@Override
public boolean equals(Object obj) {
- if (obj != null && obj instanceof TupleSerializerBase) {
- TupleSerializerBase<?> otherTS = (TupleSerializerBase<?>) obj;
- return (otherTS.tupleClass == this.tupleClass) &&
- Arrays.deepEquals(this.fieldSerializers, otherTS.fieldSerializers);
- }
- else {
+ if (obj instanceof TupleSerializerBase) {
+ TupleSerializerBase<?> other = (TupleSerializerBase<?>) obj;
+
+ return other.canEqual(this) &&
+ tupleClass == other.tupleClass &&
+ Arrays.equals(fieldSerializers, other.fieldSerializers) &&
+ arity == other.arity;
+ } else {
return false;
}
}
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof TupleSerializerBase;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
index ad1b0f0..179ef19 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.typeutils.runtime;
import java.io.IOException;
+import com.google.common.base.Preconditions;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
@@ -47,11 +48,7 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
// --------------------------------------------------------------------------------------------
public ValueSerializer(Class<T> type) {
- if (type == null) {
- throw new NullPointerException();
- }
-
- this.type = type;
+ this.type = Preconditions.checkNotNull(type);
}
// --------------------------------------------------------------------------------------------
@@ -126,16 +123,22 @@ public class ValueSerializer<T extends Value> extends TypeSerializer<T> {
@Override
public int hashCode() {
- return this.type.hashCode() + 17;
+ return this.type.hashCode();
}
@Override
public boolean equals(Object obj) {
- if (obj.getClass() == ValueSerializer.class) {
+ if (obj instanceof ValueSerializer) {
ValueSerializer<?> other = (ValueSerializer<?>) obj;
- return this.type == other.type;
+
+ return other.canEqual(this) && type == other.type;
} else {
return false;
}
}
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof ValueSerializer;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
index 60012ee..d854f52 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java
@@ -121,16 +121,22 @@ public class WritableSerializer<T extends Writable> extends TypeSerializer<T> {
@Override
public int hashCode() {
- return this.typeClass.hashCode() + 177;
+ return this.typeClass.hashCode();
}
@Override
public boolean equals(Object obj) {
- if (obj.getClass() == WritableSerializer.class) {
+ if (obj instanceof WritableSerializer) {
WritableSerializer<?> other = (WritableSerializer<?>) obj;
- return this.typeClass == other.typeClass;
+
+ return other.canEqual(this) && typeClass == other.typeClass;
} else {
return false;
}
}
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof WritableSerializer;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
index 8ae3562..28473a2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java
@@ -25,6 +25,7 @@ import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Preconditions;
import com.twitter.chill.ScalaKryoInstantiator;
import org.apache.avro.generic.GenericData;
@@ -44,6 +45,7 @@ import java.io.IOException;
import java.lang.reflect.Modifier;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Objects;
/**
* A type serializer that serializes its type using the Kryo serialization
@@ -83,10 +85,7 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
// ------------------------------------------------------------------------
public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
- if(type == null){
- throw new NullPointerException("Type class cannot be null.");
- }
- this.type = type;
+ this.type = Preconditions.checkNotNull(type);
this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses();
@@ -240,19 +239,34 @@ public class KryoSerializer<T> extends TypeSerializer<T> {
@Override
public int hashCode() {
- return type.hashCode();
+ return Objects.hash(
+ type,
+ registeredTypes,
+ registeredTypesWithSerializerClasses,
+ defaultSerializerClasses);
}
@Override
public boolean equals(Object obj) {
- if (obj != null && obj instanceof KryoSerializer) {
+ if (obj instanceof KryoSerializer) {
KryoSerializer<?> other = (KryoSerializer<?>) obj;
- return other.type == this.type;
+
+ // we cannot include the Serializers here because they don't implement the equals method
+ return other.canEqual(this) &&
+ type == other.type &&
+ registeredTypes.equals(other.registeredTypes) &&
+ registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) &&
+ defaultSerializerClasses.equals(other.defaultSerializerClasses);
} else {
return false;
}
}
-
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof KryoSerializer;
+ }
+
// --------------------------------------------------------------------------------------------
private void checkKryoInitialized() {
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index ad491d0..ebaa44d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -43,6 +43,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
public class CollectionInputFormatTest {
@@ -265,8 +266,8 @@ public class CollectionInputFormatTest {
private static final long serialVersionUID = 1L;
- private boolean failOnRead;
- private boolean failOnWrite;
+ private final boolean failOnRead;
+ private final boolean failOnWrite;
public TestSerializer(boolean failOnRead, boolean failOnWrite) {
this.failOnRead = failOnRead;
@@ -331,5 +332,26 @@ public class CollectionInputFormatTest {
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.writeInt(source.readInt());
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof TestSerializer) {
+ TestSerializer other = (TestSerializer) obj;
+
+ return other.canEqual(this) && failOnRead == other.failOnRead && failOnWrite == other.failOnWrite;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj instanceof TestSerializer;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(failOnRead, failOnWrite);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
index 34fde20..96ba264 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/PojoTypeExtractionTest.java
@@ -336,37 +336,37 @@ public class PojoTypeExtractionTest {
tupleSeen = false, objectSeen = false, writableSeen = false, collectionSeen = false;
for(int i = 0; i < pojoTypeComplexNested.getArity(); i++) {
PojoField field = pojoTypeComplexNested.getPojoFieldAt(i);
- String name = field.field.getName();
+ String name = field.getField().getName();
if(name.equals("date")) {
if(dateSeen) {
Assert.fail("already seen");
}
dateSeen = true;
- Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.type);
- Assert.assertEquals(Date.class, field.type.getTypeClass());
+ Assert.assertEquals(BasicTypeInfo.DATE_TYPE_INFO, field.getTypeInformation());
+ Assert.assertEquals(Date.class, field.getTypeInformation().getTypeClass());
} else if(name.equals("someNumberWithÜnicödeNäme")) {
if(intSeen) {
Assert.fail("already seen");
}
intSeen = true;
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
- Assert.assertEquals(Integer.class, field.type.getTypeClass());
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
+ Assert.assertEquals(Integer.class, field.getTypeInformation().getTypeClass());
} else if(name.equals("someFloat")) {
if(floatSeen) {
Assert.fail("already seen");
}
floatSeen = true;
- Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.type);
- Assert.assertEquals(Float.class, field.type.getTypeClass());
+ Assert.assertEquals(BasicTypeInfo.FLOAT_TYPE_INFO, field.getTypeInformation());
+ Assert.assertEquals(Float.class, field.getTypeInformation().getTypeClass());
} else if(name.equals("word")) {
if(tupleSeen) {
Assert.fail("already seen");
}
tupleSeen = true;
- Assert.assertTrue(field.type instanceof TupleTypeInfo<?>);
- Assert.assertEquals(Tuple3.class, field.type.getTypeClass());
+ Assert.assertTrue(field.getTypeInformation() instanceof TupleTypeInfo<?>);
+ Assert.assertEquals(Tuple3.class, field.getTypeInformation().getTypeClass());
// do some more advanced checks on the tuple
- TupleTypeInfo<?> tupleTypeFromComplexNested = (TupleTypeInfo<?>) field.type;
+ TupleTypeInfo<?> tupleTypeFromComplexNested = (TupleTypeInfo<?>) field.getTypeInformation();
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(0));
Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(1));
Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, tupleTypeFromComplexNested.getTypeAt(2));
@@ -375,21 +375,21 @@ public class PojoTypeExtractionTest {
Assert.fail("already seen");
}
objectSeen = true;
- Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type);
- Assert.assertEquals(Object.class, field.type.getTypeClass());
+ Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
+ Assert.assertEquals(Object.class, field.getTypeInformation().getTypeClass());
} else if(name.equals("hadoopCitizen")) {
if(writableSeen) {
Assert.fail("already seen");
}
writableSeen = true;
- Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.type);
- Assert.assertEquals(MyWritable.class, field.type.getTypeClass());
+ Assert.assertEquals(new WritableTypeInfo<MyWritable>(MyWritable.class), field.getTypeInformation());
+ Assert.assertEquals(MyWritable.class, field.getTypeInformation().getTypeClass());
} else if(name.equals("collection")) {
if(collectionSeen) {
Assert.fail("already seen");
}
collectionSeen = true;
- Assert.assertEquals(new GenericTypeInfo(List.class), field.type);
+ Assert.assertEquals(new GenericTypeInfo(List.class), field.getTypeInformation());
} else {
Assert.fail("field "+field+" is not expected");
@@ -428,28 +428,28 @@ public class PojoTypeExtractionTest {
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
- String name = field.field.getName();
+ String name = field.getField().getName();
if(name.equals("somethingFancy")) {
if(arrayListSeen) {
Assert.fail("already seen");
}
arrayListSeen = true;
- Assert.assertTrue(field.type instanceof GenericTypeInfo);
- Assert.assertEquals(ArrayList.class, field.type.getTypeClass());
+ Assert.assertTrue(field.getTypeInformation() instanceof GenericTypeInfo);
+ Assert.assertEquals(ArrayList.class, field.getTypeInformation().getTypeClass());
} else if(name.equals("fancyIds")) {
if(multisetSeen) {
Assert.fail("already seen");
}
multisetSeen = true;
- Assert.assertTrue(field.type instanceof GenericTypeInfo);
- Assert.assertEquals(HashMultiset.class, field.type.getTypeClass());
+ Assert.assertTrue(field.getTypeInformation() instanceof GenericTypeInfo);
+ Assert.assertEquals(HashMultiset.class, field.getTypeInformation().getTypeClass());
} else if(name.equals("fancyArray")) {
if(strArraySeen) {
Assert.fail("already seen");
}
strArraySeen = true;
- Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.type);
- Assert.assertEquals(String[].class, field.type.getTypeClass());
+ Assert.assertEquals(BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO, field.getTypeInformation());
+ Assert.assertEquals(String[].class, field.getTypeInformation().getTypeClass());
} else if(Arrays.asList("date", "someNumberWithÜnicödeNäme", "someFloat", "word", "nothing", "hadoopCitizen", "collection").contains(name)) {
// ignore these, they are inherited from the ComplexNestedClass
}
@@ -479,13 +479,13 @@ public class PojoTypeExtractionTest {
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeInformation;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
- String name = field.field.getName();
+ String name = field.getField().getName();
if(name.equals("special")) {
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
} else if(name.equals("f0") || name.equals("f1")) {
- Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
} else if(name.equals("f2")) {
- Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
} else {
Assert.fail("unexpected field");
}
@@ -499,15 +499,15 @@ public class PojoTypeExtractionTest {
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
- String name = field.field.getName();
+ String name = field.getField().getName();
if(name.equals("field1")) {
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
} else if (name.equals("field2")) {
- Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
} else if (name.equals("field3")) {
- Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
} else if (name.equals("key")) {
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
} else {
Assert.fail("Unexpected field "+field);
}
@@ -525,13 +525,13 @@ public class PojoTypeExtractionTest {
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
- String name = field.field.getName();
+ String name = field.getField().getName();
if(name.equals("field1")) {
- Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type);
+ Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
} else if (name.equals("field2")) {
- Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.type);
+ Assert.assertEquals(new GenericTypeInfo<Object>(Object.class), field.getTypeInformation());
} else if (name.equals("key")) {
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
} else {
Assert.fail("Unexpected field "+field);
}
@@ -546,14 +546,14 @@ public class PojoTypeExtractionTest {
PojoTypeInfo<?> pojoTypeForClass = (PojoTypeInfo<?>) typeForClass;
for(int i = 0; i < pojoTypeForClass.getArity(); i++) {
PojoField field = pojoTypeForClass.getPojoFieldAt(i);
- String name = field.field.getName();
+ String name = field.getField().getName();
if(name.equals("field1")) {
- Assert.assertTrue(field.type instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!)
+ Assert.assertTrue(field.getTypeInformation() instanceof PojoTypeInfo<?>); // From tuple is pojo (not tuple type!)
} else if (name.equals("field2")) {
- Assert.assertTrue(field.type instanceof TupleTypeInfo<?>);
- Assert.assertTrue( ((TupleTypeInfo<?>)field.type).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO) );
+ Assert.assertTrue(field.getTypeInformation() instanceof TupleTypeInfo<?>);
+ Assert.assertTrue( ((TupleTypeInfo<?>)field.getTypeInformation()).getTypeAt(0).equals(BasicTypeInfo.STRING_TYPE_INFO) );
} else if (name.equals("key")) {
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
} else {
Assert.fail("Unexpected field "+field);
}
@@ -641,13 +641,13 @@ public class PojoTypeExtractionTest {
PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
for(int i = 0; i < pti.getArity(); i++) {
PojoField field = pti.getPojoFieldAt(i);
- String name = field.field.getName();
+ String name = field.getField().getName();
if(name.equals("field1")) {
- Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
} else if (name.equals("field2")) {
- Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.STRING_TYPE_INFO, field.getTypeInformation());
} else if (name.equals("key")) {
- Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.INT_TYPE_INFO, field.getTypeInformation());
} else {
Assert.fail("Unexpected field "+field);
}
@@ -680,15 +680,15 @@ public class PojoTypeExtractionTest {
PojoTypeInfo<?> pti = (PojoTypeInfo<?>) ti;
for(int i = 0; i < pti.getArity(); i++) {
PojoField field = pti.getPojoFieldAt(i);
- String name = field.field.getName();
+ String name = field.getField().getName();
if(name.equals("extraField")) {
- Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.CHAR_TYPE_INFO, field.getTypeInformation());
} else if (name.equals("f0")) {
- Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.getTypeInformation());
} else if (name.equals("f1")) {
- Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, field.getTypeInformation());
} else if (name.equals("f2")) {
- Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.type);
+ Assert.assertEquals(BasicTypeInfo.LONG_TYPE_INFO, field.getTypeInformation());
} else {
Assert.fail("Unexpected field "+field);
}
@@ -831,7 +831,7 @@ public class PojoTypeExtractionTest {
public void testRecursivePojo1() {
TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo1.class);
Assert.assertTrue(ti instanceof PojoTypeInfo);
- Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).type.getClass());
+ Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) ti).getPojoFieldAt(0).getTypeInformation().getClass());
}
@Test
@@ -839,8 +839,8 @@ public class PojoTypeExtractionTest {
TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo2.class);
Assert.assertTrue(ti instanceof PojoTypeInfo);
PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
- Assert.assertTrue(pf.type instanceof TupleTypeInfo);
- Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.type).getTypeAt(0).getClass());
+ Assert.assertTrue(pf.getTypeInformation() instanceof TupleTypeInfo);
+ Assert.assertEquals(GenericTypeInfo.class, ((TupleTypeInfo) pf.getTypeInformation()).getTypeAt(0).getClass());
}
@Test
@@ -848,8 +848,8 @@ public class PojoTypeExtractionTest {
TypeInformation<?> ti = TypeExtractor.createTypeInfo(RecursivePojo3.class);
Assert.assertTrue(ti instanceof PojoTypeInfo);
PojoField pf = ((PojoTypeInfo) ti).getPojoFieldAt(0);
- Assert.assertTrue(pf.type instanceof PojoTypeInfo);
- Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.type).getPojoFieldAt(0).type.getClass());
+ Assert.assertTrue(pf.getTypeInformation() instanceof PojoTypeInfo);
+ Assert.assertEquals(GenericTypeInfo.class, ((PojoTypeInfo) pf.getTypeInformation()).getPojoFieldAt(0).getTypeInformation().getClass());
}
public static class FooBarPojo {
http://git-wip-us.apache.org/repos/asf/flink/blob/b49b1552/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
index d27a82b..7f0cf5b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/type/extractor/TypeExtractorTest.java
@@ -66,6 +66,8 @@ import org.apache.hadoop.io.Writable;
import org.junit.Assert;
import org.junit.Test;
+import javax.xml.bind.TypeConstraintException;
+
public class TypeExtractorTest {
@@ -1237,7 +1239,7 @@ public class TypeExtractorTest {
TypeInformation<?> ti = TypeExtractor.getMapReturnTypes(function, (TypeInformation) TypeInfoParser.parse("org.apache.flink.api.java.type.extractor.TypeExtractorTest$CustomArrayObject[]"));
Assert.assertTrue(ti instanceof ObjectArrayTypeInfo<?, ?>);
- Assert.assertEquals(CustomArrayObject.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentType());
+ Assert.assertEquals(CustomArrayObject.class, ((ObjectArrayTypeInfo<?, ?>) ti).getComponentInfo().getTypeClass());
}
@SuppressWarnings({ "rawtypes", "unchecked" })