You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by Xazax-hun <gi...@git.apache.org> on 2016/07/07 10:41:10 UTC

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

GitHub user Xazax-hun opened a pull request:

    https://github.com/apache/flink/pull/2211

    [WIP][FLINK-3599] Code generation for PojoSerializer and PojoComparator

    The current implementation of the serializers can be a
    performance bottleneck in some scenarios. These performance problems were
    also reported on the mailing list recently [1].
    
    E.g. the PojoSerializer uses reflection for accessing the fields, which is slow [2].
    
    For the complete proposal see [3].
    
    This pull request implements code generation support for PojoComparators and PojoSerializers. On my machine I could measure about 10% performance improvements for the WordCountPojo example. This pull request does not implement distribution of the generated code to the task managers yet.
    
    [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Tuple-performance-and-the-curious-JIT-compiler-td10666.html
    
    [2] https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java#L369
    
    [3] https://docs.google.com/document/d/1VC8lCeErx9kI5lCMPiUn625PO0rxR-iKlVqtt3hkVnk


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Xazax-hun/flink serializer_codegen

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2211.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2211
    
----
commit 6263ebe496ed7a0ac9ca9df35ffcdb8633519944
Author: Gabor Horvath <xa...@gmail.com>
Date:   2016-04-17T13:40:33Z

    Implement PojoSerializer and PojoComparator generators.

commit be698b44453f10add284db1c5dee24f719a87902
Author: Gabor Horvath <xa...@gmail.com>
Date:   2016-07-03T13:58:41Z

    Migrate code generation templates from string literals to files.

commit d8c63a1749a439907ef6bfbdb2da1962df7b61d3
Author: Gabor Horvath <xa...@gmail.com>
Date:   2016-07-06T11:23:29Z

    Fix a bunch of test failures.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77990329
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.Field;
    +import java.lang.reflect.Modifier;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
    +
    +public final class PojoSerializerGenerator<T> {
    +	private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated";
    +
    +	private final Class<T> clazz;
    +	private final Field[] refFields;
    +	private final TypeSerializer<?>[] fieldSerializers;
    +	private final ExecutionConfig config;
    +	private String code;
    +
    +	public PojoSerializerGenerator(
    +		Class<T> clazz,
    +		TypeSerializer<?>[] fields,
    +		Field[] reflectiveFields,
    +		ExecutionConfig config) {
    +		this.clazz = checkNotNull(clazz);
    +		this.refFields = checkNotNull(reflectiveFields);
    +		this.fieldSerializers = checkNotNull(fields);
    +		this.config = checkNotNull(config);
    +		for (int i = 0; i < this.refFields.length; i++) {
    +			this.refFields[i].setAccessible(true);
    +		}
    +	}
    +
    +	public TypeSerializer<T> createSerializer()  {
    +		final String className = clazz.getCanonicalName().replace('.', '_') + "_GeneratedSerializer";
    +		final String fullClassName = packageName + "." + className;
    +		Class<?> serializerClazz;
    +		code = InstantiationUtil.getCodeForCachedClass(fullClassName);
    +		if (code == null) {
    +			generateCode(className);
    +		}
    +		if(config.isWrapGeneratedClassesEnabled()) {
    +			return new GenTypeSerializerProxy<>(clazz, fullClassName, code, fieldSerializers, config);
    +		}
    +		try {
    +			serializerClazz = InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code);
    +		}
    +		catch (Exception e) {
    +			throw new RuntimeException("Unable to generate serializer: " + className, e);
    +		}
    +		Constructor<?>[] ctors = serializerClazz.getConstructors();
    +		assert ctors.length == 1;
    +		try {
    +			return (TypeSerializer<T>) ctors[0].newInstance(new Object[]{clazz, fieldSerializers, config});
    --- End diff --
    
    varargs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78001809
  
    --- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl ---
    @@ -0,0 +1,372 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package ${packageName};
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.lang.reflect.Modifier;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedHashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +
    +public final class ${className} extends TypeSerializer {
    +	private static byte IS_NULL = 1;
    +	private static byte NO_SUBCLASS = 2;
    +	private static byte IS_SUBCLASS = 4;
    +	private static byte IS_TAGGED_SUBCLASS = 8;
    +	private int numFields;
    +	private ExecutionConfig executionConfig;
    +	private Map<Class, TypeSerializer> subclassSerializerCache;
    +	private final Map<Class, Integer> registeredClasses;
    +	private final TypeSerializer[] registeredSerializers;
    +	Class clazz;
    +	<#list members as m>
    +	${m}
    +	</#list>
    +	public ${className}(Class clazz, TypeSerializer[] serializerFields, ExecutionConfig e) {
    +		this.clazz = clazz;
    +		executionConfig = e;
    +		this.numFields = serializerFields.length;
    +		LinkedHashSet<Class> registeredPojoTypes = executionConfig.getRegisteredPojoTypes();
    +		subclassSerializerCache = new HashMap<Class, TypeSerializer>();
    +		List<Class> cleanedTaggedClasses = new ArrayList<Class>(registeredPojoTypes.size());
    +		for (Class registeredClass: registeredPojoTypes) {
    +			if (registeredClass.equals(clazz)) {
    +				continue;
    +			}
    +			if (!clazz.isAssignableFrom(registeredClass)) {
    +				continue;
    +			}
    +			cleanedTaggedClasses.add(registeredClass);
    +		}
    +		this.registeredClasses = new LinkedHashMap<Class, Integer>(cleanedTaggedClasses.size());
    +		registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()];
    +		int id = 0;
    +		for (Class registeredClass: cleanedTaggedClasses) {
    +			this.registeredClasses.put(registeredClass, id);
    +			TypeInformation typeInfo = TypeExtractor.createTypeInfo(registeredClass);
    +			registeredSerializers[id] = typeInfo.createSerializer(executionConfig);
    +			id++;
    +		}
    +		<#list initMembers as m>
    +		${m}
    +		</#list>
    +	}
    +	private TypeSerializer getSubclassSerializer(Class subclass) {
    +		TypeSerializer result = (TypeSerializer)subclassSerializerCache.get(subclass);
    +		if (result == null) {
    +			TypeInformation typeInfo = TypeExtractor.createTypeInfo(subclass);
    +			result = typeInfo.createSerializer(executionConfig);
    +			subclassSerializerCache.put(subclass, result);
    +		}
    +		return result;
    +	}
    +	public boolean isImmutableType() { return false; }
    +	public ${className} duplicate() {
    +		boolean stateful = false;
    +		TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[numFields];
    +		<#list duplicateSerializers as ds>
    +		${ds}
    +		</#list>
    +		if (stateful) {
    +			return new ${className}(clazz, duplicateFieldSerializers, executionConfig);
    +		} else {
    +			return this;
    +		}
    +	}
    +	public ${typeName} createInstance() {
    +		if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) {
    +			return null;
    +		}
    +		try {
    +			${typeName} t = (${typeName})clazz.newInstance();
    --- End diff --
    
    You can generate `new ${typeName}` here, which avoids reflection. Also, the above checks can be performed at code generation time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78138160
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.Field;
    +import java.lang.reflect.Modifier;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
    +
    +public final class PojoSerializerGenerator<T> {
    +	private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated";
    +
    +	private final Class<T> clazz;
    +	private final Field[] refFields;
    +	private final TypeSerializer<?>[] fieldSerializers;
    +	private final ExecutionConfig config;
    +	private String code;
    +
    +	public PojoSerializerGenerator(
    +		Class<T> clazz,
    +		TypeSerializer<?>[] fields,
    +		Field[] reflectiveFields,
    +		ExecutionConfig config) {
    +		this.clazz = checkNotNull(clazz);
    +		this.refFields = checkNotNull(reflectiveFields);
    +		this.fieldSerializers = checkNotNull(fields);
    +		this.config = checkNotNull(config);
    +		for (int i = 0; i < this.refFields.length; i++) {
    +			this.refFields[i].setAccessible(true);
    +		}
    +	}
    +
    +	public TypeSerializer<T> createSerializer()  {
    +		final String className = clazz.getCanonicalName().replace('.', '_') + "_GeneratedSerializer";
    +		final String fullClassName = packageName + "." + className;
    +		Class<?> serializerClazz;
    +		code = InstantiationUtil.getCodeForCachedClass(fullClassName);
    +		if (code == null) {
    +			generateCode(className);
    +		}
    +		if(config.isWrapGeneratedClassesEnabled()) {
    +			return new GenTypeSerializerProxy<>(clazz, fullClassName, code, fieldSerializers, config);
    +		}
    +		try {
    +			serializerClazz = InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code);
    +		}
    +		catch (Exception e) {
    +			throw new RuntimeException("Unable to generate serializer: " + className, e);
    +		}
    +		Constructor<?>[] ctors = serializerClazz.getConstructors();
    +		assert ctors.length == 1;
    +		try {
    +			return (TypeSerializer<T>) ctors[0].newInstance(new Object[]{clazz, fieldSerializers, config});
    +		} catch (Exception e) {
    +			throw new RuntimeException("Unable to instantiate serializer: " + className, e);
    +		}
    +
    +	}
    +
    +	private void generateCode(String className) {
    +		assert fieldSerializers.length > 0;
    +		String typeName = clazz.getCanonicalName();
    +		StringBuilder members = new StringBuilder();
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			members.append(String.format("final TypeSerializer f%d;\n", i));
    +		}
    +		StringBuilder initMembers = new StringBuilder();
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			initMembers.append(String.format("f%d = serializerFields[%d];\n", i, i));
    +		}
    +		StringBuilder createFields = new StringBuilder();
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			createFields.append(String.format("((" + typeName + ")t)." + modifyStringForField(refFields[i],
    +				"f%d.createInstance()") + ";\n", i));
    +		}
    +		StringBuilder copyFields = new StringBuilder();
    +		copyFields.append("Object value;\n");
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			if (refFields[i].getType().isPrimitive()) {
    +				copyFields.append(String.format("((" + typeName + ")target)." + modifyStringForField(refFields[i],
    +					"f%d.copy(((" + typeName + ")from)." + accessStringForField(refFields[i])) + ");\n", i));
    +			} else {
    +				copyFields.append(String.format(
    +					"value = ((" + typeName + ")from)." + accessStringForField(refFields[i]) + ";\n" +
    +					"if (value != null) {\n" +
    +					"	((" + typeName + ")target)." + modifyStringForField(refFields[i], "f%d.copy(value)") + ";\n" +
    +					"} else {\n" +
    +					"	((" + typeName + ")target)." + modifyStringForField(refFields[i], "null") + ";\n" +
    +					"}\n", i));
    +			}
    +		}
    +		StringBuilder reuseCopyFields = new StringBuilder();
    +		reuseCopyFields.append("Object value;\n");
    +		reuseCopyFields.append("Object reuseValue;\n");
    +		reuseCopyFields.append("Object copy;\n");
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			if (refFields[i].getType().isPrimitive()) {
    +				reuseCopyFields.append(String.format("((" + typeName + ")reuse)." + modifyStringForField(refFields[i],
    +					"f%d.copy(((" + typeName + ")from)." + accessStringForField(refFields[i]) + ")") + ";\n", i));
    +			} else {
    +				reuseCopyFields.append(String.format(
    +					"value = ((" + typeName + ")from)." + accessStringForField(refFields[i]) + ";\n" +
    +					"if (value != null) {\n" +
    +					"	reuseValue = ((" + typeName + ")reuse)." + accessStringForField(refFields[i]) + ";\n" +
    +					"	if (reuseValue != null) {\n" +
    --- End diff --
    
    However, in the primitive case, you don't need to call the `copy` method of `f%d`, you can just do an assignment. (Generally, the main purpose of `copy` is to solve the problem of how to make deep copies.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by Xazax-hun <gi...@git.apache.org>.
Github user Xazax-hun commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78086011
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -419,4 +481,61 @@ public String toString() {
     			return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]";
     		}
     	}
    +
    +	public static String accessStringForField(Field f) {
    +		String fieldName = f.getName();
    +		if (Modifier.isPublic(f.getModifiers())) {
    +			return fieldName;
    +		}
    +		String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
    +		Class parentClazz = f.getDeclaringClass();
    +		try {
    +			parentClazz.getMethod(getterName, new Class[0]);
    +		} catch (NoSuchMethodException e) {
    +			// No getter, it might be a scala class.
    +			return fieldName + "()";
    +		}
    +		return getterName + "()";
    +	}
    +
    +	public static String modifyStringForField(Field f, String arg) {
    +		String fieldName = f.getName();
    +		if (Modifier.isPublic(f.getModifiers())) {
    +			if (f.getType().isPrimitive()) {
    +				return f.getName() + " = (" +
    +					primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + ")" + arg;
    --- End diff --
    
    Because arg usually refers to an object. I will add a comment to clarify this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77992445
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -419,4 +481,61 @@ public String toString() {
     			return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]";
     		}
     	}
    +
    +	public static String accessStringForField(Field f) {
    +		String fieldName = f.getName();
    +		if (Modifier.isPublic(f.getModifiers())) {
    +			return fieldName;
    +		}
    +		String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
    +		Class parentClazz = f.getDeclaringClass();
    +		try {
    +			parentClazz.getMethod(getterName, new Class[0]);
    +		} catch (NoSuchMethodException e) {
    +			// No getter, it might be a scala class.
    +			return fieldName + "()";
    +		}
    +		return getterName + "()";
    +	}
    +
    +	public static String modifyStringForField(Field f, String arg) {
    +		String fieldName = f.getName();
    +		if (Modifier.isPublic(f.getModifiers())) {
    +			if (f.getType().isPrimitive()) {
    +				return f.getName() + " = (" +
    +					primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + ")" + arg;
    +			} else {
    +				return f.getName() + " = (" + f.getType().getCanonicalName() + ")" + arg;
    +			}
    +		}
    +		String setterName = "set" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
    +		Class parentClazz = f.getDeclaringClass();
    --- End diff --
    
    `<?>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77994444
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -419,4 +481,61 @@ public String toString() {
     			return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]";
     		}
     	}
    +
    +	public static String accessStringForField(Field f) {
    +		String fieldName = f.getName();
    +		if (Modifier.isPublic(f.getModifiers())) {
    +			return fieldName;
    +		}
    +		String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
    +		Class parentClazz = f.getDeclaringClass();
    +		try {
    +			parentClazz.getMethod(getterName, new Class[0]);
    +		} catch (NoSuchMethodException e) {
    +			// No getter, it might be a scala class.
    +			return fieldName + "()";
    +		}
    +		return getterName + "()";
    +	}
    +
    +	public static String modifyStringForField(Field f, String arg) {
    +		String fieldName = f.getName();
    +		if (Modifier.isPublic(f.getModifiers())) {
    +			if (f.getType().isPrimitive()) {
    +				return f.getName() + " = (" +
    +					primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + ")" + arg;
    +			} else {
    +				return f.getName() + " = (" + f.getType().getCanonicalName() + ")" + arg;
    +			}
    +		}
    +		String setterName = "set" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
    +		Class parentClazz = f.getDeclaringClass();
    +		try {
    +			parentClazz.getMethod(setterName, f.getType());
    +		} catch (NoSuchMethodException e) {
    +			// No getter, it might be a scala class.
    --- End diff --
    
    Same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r69953281
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -419,4 +470,47 @@ public String toString() {
     			return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]";
     		}
     	}
    +
    +	public static String accesStringForField(Field f) {
    --- End diff --
    
    `acces` -> `access`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78149815
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerTest.java ---
    @@ -50,7 +50,6 @@
     	@Override
     	protected TypeSerializer<TestUserClass> createSerializer() {
     		TypeSerializer<TestUserClass> serializer = type.createSerializer(new ExecutionConfig());
    -		assert(serializer instanceof PojoSerializer);
    --- End diff --
    
    If I understand correctly, this test originally tested `PojoSerializer`, and now it is testing the generated serializers only. I think it would be good to tweak it to also test PojoSerializer. You could probably do this by creating a subclass of this class, where you override `createSerializer`, and pass an `ExecutionConfig` with codegen disabled.
    
    Additionally, maybe you could also add a test that checks compatibility between `PojoSerializer` and the generated serializers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77984130
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
    @@ -455,6 +459,33 @@ public boolean isForceAvroEnabled() {
     		return forceAvro;
     	}
     
    +    /**
    +     * Force Flink to use the generated serializers for POJOs.
    +     */
    +	public void enableCodeGeneration() {
    +		forceCodeGeneration = true;
    +	}
    +
    +	public void disableCodeGeneration() {
    +		forceCodeGeneration = false;
    +	}
    +
    +	public boolean isCodeGenerationEnabled() {
    +		return forceCodeGeneration;
    +	}
    +
    +	public void enableWrapGeneratedClasses() {
    --- End diff --
    
    I'm not sure if this is still needed, but I might be missing something. Shouldn't this always be enabled?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r69952548
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -70,10 +75,21 @@
     	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
     	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
     
    +	private static final Map<Class<?>, Class<? extends TypeSerializer>> customSerializers = new HashMap<>();
    +	private static final Map<Class<?>, Class<? extends TypeComparator>> customComparators = new HashMap<>();
    +
     	private final PojoField[] fields;
     	
     	private final int totalFields;
     
    +	public static <C, S extends TypeSerializer<C>> void registerCustomSerializer(Class<C> c, Class<S> s) {
    +		customSerializers.put(c, s);
    --- End diff --
    
    Should we also do a preconditions check here (and in `registerCustomComparator`) that `s` has a single public constructor? Is the assertion message obvious in `createSerializer`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78708846
  
    --- Diff: flink-core/pom.xml ---
    @@ -76,6 +76,18 @@ under the License.
     			<artifactId>commons-collections</artifactId>
     		</dependency>
     		
    +		<dependency>
    +			<groupId>org.codehaus.janino</groupId>
    +			<artifactId>janino</artifactId>
    +			<version>2.7.8</version>
    +		</dependency>
    +
    +		<dependency>
    +			<groupId>org.freemarker</groupId>
    +			<artifactId>freemarker</artifactId>
    +			<version>2.3.24-incubating</version>
    +		</dependency>
    +
    --- End diff --
    
    You can update Janino to `3.0.1` and FreeMarker to `2.3.25-incubating`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78750651
  
    --- Diff: flink-examples/flink-examples-streaming/pom.xml ---
    @@ -84,6 +84,11 @@ under the License.
     			<scope>test</scope>
     			<type>test-jar</type>
     		</dependency>
    +		<dependency>
    --- End diff --
    
    Fixed in 456da8a63568222c3a393177d7c4705cd4174103


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77993022
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -419,4 +481,61 @@ public String toString() {
     			return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]";
     		}
     	}
    +
    +	public static String accessStringForField(Field f) {
    +		String fieldName = f.getName();
    +		if (Modifier.isPublic(f.getModifiers())) {
    +			return fieldName;
    +		}
    +		String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
    +		Class parentClazz = f.getDeclaringClass();
    +		try {
    +			parentClazz.getMethod(getterName, new Class[0]);
    +		} catch (NoSuchMethodException e) {
    +			// No getter, it might be a scala class.
    --- End diff --
    
    Is this possible at this point? The rules of being a POJO should not allow this, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77995491
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.lang.reflect.Constructor;
    +import java.util.List;
    +
    +public class GenTypeComparatorProxy<T> extends CompositeTypeComparator<T> implements java.io.Serializable {
    --- End diff --
    
    Please add javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77988402
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -393,6 +443,18 @@ public void addComparatorField(int fieldId, TypeComparator<?> comparator) {
     				keyFields.size() == fieldComparators.size(),
     				"Number of key fields and field comparators is not equal.");
     
    +			Tuple2<ArrayList<Integer>, Class> custCompKey = new Tuple2(keyFieldIds, getTypeClass());
    +			if (customComparators.containsKey(custCompKey)) {
    +				return InstantiationUtil.instantiate(customComparators.get(custCompKey));
    +			}
    +
    +			if (config.isCodeGenerationEnabled()) {
    +				return new PojoComparatorGenerator<T>(keyFields.toArray(new Field[keyFields.size()]),
    +					fieldComparators.toArray(new TypeComparator[fieldComparators.size()]), createSerializer
    +					(config), getTypeClass(), keyFieldIds.toArray(new Integer[keyFields.size()]), config)
    --- End diff --
    
    I think it's less verbose to use the parameterless toArray, and cast to the appropriate array type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78136985
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.Field;
    +import java.lang.reflect.Modifier;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
    +
    +public final class PojoSerializerGenerator<T> {
    +	private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated";
    +
    +	private final Class<T> clazz;
    +	private final Field[] refFields;
    +	private final TypeSerializer<?>[] fieldSerializers;
    +	private final ExecutionConfig config;
    +	private String code;
    +
    +	public PojoSerializerGenerator(
    +		Class<T> clazz,
    +		TypeSerializer<?>[] fields,
    +		Field[] reflectiveFields,
    +		ExecutionConfig config) {
    +		this.clazz = checkNotNull(clazz);
    +		this.refFields = checkNotNull(reflectiveFields);
    +		this.fieldSerializers = checkNotNull(fields);
    +		this.config = checkNotNull(config);
    +		for (int i = 0; i < this.refFields.length; i++) {
    +			this.refFields[i].setAccessible(true);
    +		}
    +	}
    +
    +	public TypeSerializer<T> createSerializer()  {
    +		final String className = clazz.getCanonicalName().replace('.', '_') + "_GeneratedSerializer";
    +		final String fullClassName = packageName + "." + className;
    +		Class<?> serializerClazz;
    +		code = InstantiationUtil.getCodeForCachedClass(fullClassName);
    +		if (code == null) {
    +			generateCode(className);
    +		}
    +		if(config.isWrapGeneratedClassesEnabled()) {
    +			return new GenTypeSerializerProxy<>(clazz, fullClassName, code, fieldSerializers, config);
    +		}
    +		try {
    +			serializerClazz = InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code);
    +		}
    +		catch (Exception e) {
    +			throw new RuntimeException("Unable to generate serializer: " + className, e);
    +		}
    +		Constructor<?>[] ctors = serializerClazz.getConstructors();
    +		assert ctors.length == 1;
    +		try {
    +			return (TypeSerializer<T>) ctors[0].newInstance(new Object[]{clazz, fieldSerializers, config});
    +		} catch (Exception e) {
    +			throw new RuntimeException("Unable to instantiate serializer: " + className, e);
    +		}
    +
    +	}
    +
    +	private void generateCode(String className) {
    +		assert fieldSerializers.length > 0;
    +		String typeName = clazz.getCanonicalName();
    +		StringBuilder members = new StringBuilder();
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			members.append(String.format("final TypeSerializer f%d;\n", i));
    +		}
    +		StringBuilder initMembers = new StringBuilder();
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			initMembers.append(String.format("f%d = serializerFields[%d];\n", i, i));
    +		}
    +		StringBuilder createFields = new StringBuilder();
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			createFields.append(String.format("((" + typeName + ")t)." + modifyStringForField(refFields[i],
    +				"f%d.createInstance()") + ";\n", i));
    +		}
    +		StringBuilder copyFields = new StringBuilder();
    +		copyFields.append("Object value;\n");
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			if (refFields[i].getType().isPrimitive()) {
    +				copyFields.append(String.format("((" + typeName + ")target)." + modifyStringForField(refFields[i],
    +					"f%d.copy(((" + typeName + ")from)." + accessStringForField(refFields[i])) + ");\n", i));
    +			} else {
    +				copyFields.append(String.format(
    +					"value = ((" + typeName + ")from)." + accessStringForField(refFields[i]) + ";\n" +
    +					"if (value != null) {\n" +
    +					"	((" + typeName + ")target)." + modifyStringForField(refFields[i], "f%d.copy(value)") + ";\n" +
    +					"} else {\n" +
    +					"	((" + typeName + ")target)." + modifyStringForField(refFields[i], "null") + ";\n" +
    +					"}\n", i));
    +			}
    +		}
    +		StringBuilder reuseCopyFields = new StringBuilder();
    +		reuseCopyFields.append("Object value;\n");
    +		reuseCopyFields.append("Object reuseValue;\n");
    +		reuseCopyFields.append("Object copy;\n");
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			if (refFields[i].getType().isPrimitive()) {
    +				reuseCopyFields.append(String.format("((" + typeName + ")reuse)." + modifyStringForField(refFields[i],
    +					"f%d.copy(((" + typeName + ")from)." + accessStringForField(refFields[i]) + ")") + ";\n", i));
    +			} else {
    +				reuseCopyFields.append(String.format(
    +					"value = ((" + typeName + ")from)." + accessStringForField(refFields[i]) + ";\n" +
    +					"if (value != null) {\n" +
    +					"	reuseValue = ((" + typeName + ")reuse)." + accessStringForField(refFields[i]) + ";\n" +
    +					"	if (reuseValue != null) {\n" +
    --- End diff --
    
    Ah yes, sorry!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77996133
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.Field;
    +import java.lang.reflect.Modifier;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
    +
    +public final class PojoSerializerGenerator<T> {
    --- End diff --
    
    Please add javadoc and audience/stability annotation (probably Internal).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by Xazax-hun <gi...@git.apache.org>.
Github user Xazax-hun commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78087961
  
    --- Diff: flink-core/src/main/resources/PojoComparatorTemplate.ftl ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package ${packageName};
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.util.List;
    +import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.types.NullKeyFieldException;
    +import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase;
    +import org.apache.flink.api.java.typeutils.runtime.GenTypeComparatorProxy;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +public final class ${className} extends CompositeTypeComparator implements java.io.Serializable {
    --- End diff --
    
    Janino ignores generics. If I want generics, need to validate code with java as well to be sure that I get it right, so I just decided to not bother with it for now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78149877
  
    --- Diff: flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/PojoSubclassSerializerTest.java ---
    @@ -41,7 +41,6 @@
     		ExecutionConfig conf = new ExecutionConfig();
     		conf.registerPojoType(TestUserClass1.class);
     		TypeSerializer<TestUserClassBase> serializer = type.createSerializer(conf);
    -		assert(serializer instanceof PojoSerializer);
    --- End diff --
    
    Same here as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77995790
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeSerializerProxy.java ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.lang.reflect.Constructor;
    +
    +public class GenTypeSerializerProxy<T> extends TypeSerializer<T> {
    --- End diff --
    
    Please add javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78003691
  
    --- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl ---
    @@ -0,0 +1,372 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package ${packageName};
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.lang.reflect.Modifier;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedHashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +
    +public final class ${className} extends TypeSerializer {
    +	private static byte IS_NULL = 1;
    +	private static byte NO_SUBCLASS = 2;
    +	private static byte IS_SUBCLASS = 4;
    +	private static byte IS_TAGGED_SUBCLASS = 8;
    +	private int numFields;
    +	private ExecutionConfig executionConfig;
    +	private Map<Class, TypeSerializer> subclassSerializerCache;
    +	private final Map<Class, Integer> registeredClasses;
    +	private final TypeSerializer[] registeredSerializers;
    +	Class clazz;
    +	<#list members as m>
    +	${m}
    +	</#list>
    +	public ${className}(Class clazz, TypeSerializer[] serializerFields, ExecutionConfig e) {
    +		this.clazz = clazz;
    +		executionConfig = e;
    +		this.numFields = serializerFields.length;
    +		LinkedHashSet<Class> registeredPojoTypes = executionConfig.getRegisteredPojoTypes();
    +		subclassSerializerCache = new HashMap<Class, TypeSerializer>();
    +		List<Class> cleanedTaggedClasses = new ArrayList<Class>(registeredPojoTypes.size());
    +		for (Class registeredClass: registeredPojoTypes) {
    +			if (registeredClass.equals(clazz)) {
    +				continue;
    +			}
    +			if (!clazz.isAssignableFrom(registeredClass)) {
    +				continue;
    +			}
    +			cleanedTaggedClasses.add(registeredClass);
    +		}
    +		this.registeredClasses = new LinkedHashMap<Class, Integer>(cleanedTaggedClasses.size());
    +		registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()];
    +		int id = 0;
    +		for (Class registeredClass: cleanedTaggedClasses) {
    +			this.registeredClasses.put(registeredClass, id);
    +			TypeInformation typeInfo = TypeExtractor.createTypeInfo(registeredClass);
    +			registeredSerializers[id] = typeInfo.createSerializer(executionConfig);
    +			id++;
    +		}
    +		<#list initMembers as m>
    +		${m}
    +		</#list>
    +	}
    +	private TypeSerializer getSubclassSerializer(Class subclass) {
    +		TypeSerializer result = (TypeSerializer)subclassSerializerCache.get(subclass);
    +		if (result == null) {
    +			TypeInformation typeInfo = TypeExtractor.createTypeInfo(subclass);
    +			result = typeInfo.createSerializer(executionConfig);
    +			subclassSerializerCache.put(subclass, result);
    +		}
    +		return result;
    +	}
    +	public boolean isImmutableType() { return false; }
    +	public ${className} duplicate() {
    +		boolean stateful = false;
    +		TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[numFields];
    +		<#list duplicateSerializers as ds>
    +		${ds}
    +		</#list>
    +		if (stateful) {
    +			return new ${className}(clazz, duplicateFieldSerializers, executionConfig);
    +		} else {
    +			return this;
    +		}
    +	}
    +	public ${typeName} createInstance() {
    +		if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) {
    +			return null;
    +		}
    +		try {
    +			${typeName} t = (${typeName})clazz.newInstance();
    +			initializeFields(t);
    +			return t;
    +		}
    +		catch (Exception e) {
    +			throw new RuntimeException("Cannot instantiate class.", e);
    +		}
    +	}
    +	protected void initializeFields(${typeName} t) {
    +		<#list createFields as cf>
    +		${cf}
    +		</#list>
    +	}
    +	public ${typeName} copy(Object from) {
    +		if (from == null) return null;
    +		Class<?> actualType = from.getClass();
    +		${typeName} target;
    +		if (actualType == clazz) {
    +			try {
    +				target = (${typeName}) from.getClass().newInstance();
    +			}
    +			catch (Throwable t) {
    +				throw new RuntimeException("Cannot instantiate class.", t);
    +			}
    +			<#list copyFields as cf>
    +			${cf}
    +			</#list>
    +			return target;
    +		} else {
    +			TypeSerializer subclassSerializer = getSubclassSerializer(actualType);
    +			return (${typeName})subclassSerializer.copy(from);
    +		}
    +	}
    +	public ${typeName} copy(Object from, Object reuse) {
    +		if (from == null) return null;
    +		Class actualType = from.getClass();
    +		if (actualType == clazz) {
    --- End diff --
    
    Is this needed if the type is final?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78153259
  
    --- Diff: flink-core/src/main/java/org/apache/flink/util/InstantiationUtil.java ---
    @@ -71,13 +139,18 @@ public ClassLoaderObjectInputStream(InputStream in, ClassLoader classLoader) thr
     						// return primitive class
     						return cl;
     					} else {
    -						// throw ClassNotFoundException
    -						throw ex;
    +						// search among the compiled classes too
    +						return Class.forName(name, false, loaderForGeneratedClasses.get(name));
     					}
     				}
     			}
     
    -			return super.resolveClass(desc);
    +			try {
    +				return super.resolveClass(desc);
    +			} catch (ClassNotFoundException ex) {
    +				// It is possible the the passed class loader is null but we still want to load generated classes.
    --- End diff --
    
    "the the"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77993831
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -419,4 +481,61 @@ public String toString() {
     			return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]";
     		}
     	}
    +
    +	public static String accessStringForField(Field f) {
    +		String fieldName = f.getName();
    +		if (Modifier.isPublic(f.getModifiers())) {
    +			return fieldName;
    +		}
    +		String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
    +		Class parentClazz = f.getDeclaringClass();
    +		try {
    +			parentClazz.getMethod(getterName, new Class[0]);
    +		} catch (NoSuchMethodException e) {
    +			// No getter, it might be a scala class.
    +			return fieldName + "()";
    +		}
    +		return getterName + "()";
    +	}
    +
    +	public static String modifyStringForField(Field f, String arg) {
    +		String fieldName = f.getName();
    +		if (Modifier.isPublic(f.getModifiers())) {
    +			if (f.getType().isPrimitive()) {
    +				return f.getName() + " = (" +
    +					primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + ")" + arg;
    --- End diff --
    
    Why do we need to cast to the boxed types if the field is non-boxed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77989471
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.Field;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
    +
    +public final class PojoComparatorGenerator<T> {
    +	private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated";
    +
    +	private transient Field[] keyFields;
    +	private transient Integer[] keyFieldIds;
    +	private final TypeComparator<Object>[] comparators;
    --- End diff --
    
    You could use wildcard (`?`) here, instead of `Object`, to avoid the unchecked cast in the ctor. (And then also in `GenTypeComparatorProxy.comparators`, so that you can pass this there without a cast.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77995157
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.lang.reflect.Constructor;
    +import java.util.List;
    +
    +public class GenTypeComparatorProxy<T> extends CompositeTypeComparator<T> implements java.io.Serializable {
    +	private final String code;
    +	private final String name;
    +	private final Class<T> clazz;
    +	private final TypeComparator<Object>[] comparators;
    +	private final TypeSerializer<T> serializer;
    +
    +	transient private CompositeTypeComparator<T> impl = null;
    +
    +	private void compile() {
    +		try {
    +			assert impl == null;
    +			Class<?> comparatorClazz = InstantiationUtil.compile(clazz.getClassLoader(), name, code);
    +			Constructor<?>[] ctors = comparatorClazz.getConstructors();
    +			assert ctors.length == 1;
    +			impl = (CompositeTypeComparator<T>) ctors[0].newInstance(new Object[]{comparators, serializer, clazz});
    +		} catch (Exception e) {
    +			throw new RuntimeException("Unable to generate serializer: " + name, e);
    +		}
    +	}
    +
    +	public GenTypeComparatorProxy(Class<T> clazz, String name, String code,TypeComparator<Object>[] comparators,
    +									TypeSerializer<T> serializer) {
    +		this.name = name;
    +		this.code = code;
    +		this.clazz = clazz;
    +		this.comparators = comparators;
    +		this.serializer = serializer;
    +		compile();
    +	}
    +
    +	@SuppressWarnings("unchecked")
    --- End diff --
    
    Not needed if `comparators` has `<?>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78151473
  
    --- Diff: flink-examples/flink-examples-streaming/pom.xml ---
    @@ -84,6 +84,11 @@ under the License.
     			<scope>test</scope>
     			<type>test-jar</type>
     		</dependency>
    +		<dependency>
    --- End diff --
    
    This dependency is not needed, as the `${project.version}` is specified above.
    (Note that `ForkableFlinkMiniCluster` was replaced by `LocalFlinkMiniCluster` in 02b852e3571e46f25fdfc79f43ceb726ddff9ba7.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77987318
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -70,10 +77,41 @@
     	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
     	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
     
    +	private static final Map<Class<?>, Class<? extends TypeSerializer>> customSerializers = new HashMap<>();
    +	private static final Map<Tuple2<ArrayList<Integer>, Class>, Class<? extends TypeComparator>> customComparators =
    +		new HashMap<>();
    +
     	private final PojoField[] fields;
     	
     	private final int totalFields;
     
    +	/**
    +	 * Register a custom serializer for a type. The precedence of the serializers
    +	 * is the following (highest to lowest): Kryo, Avro, Custom, Generated, Flink.
    +	 * The chosen serializer will be the first one from the list that is turned on.
    +	 *
    +	 */
    +	public static <C, S extends TypeSerializer<C>> void registerCustomSerializer(Class<C> clazz, Class<S> ser) {
    --- End diff --
    
    Most of the other methods in this class are PublicEvolving. Maybe we should add this annotation here as well. (And also below.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77990034
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeComparatorProxy.java ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.lang.reflect.Constructor;
    +import java.util.List;
    +
    +public class GenTypeComparatorProxy<T> extends CompositeTypeComparator<T> implements java.io.Serializable {
    +	private final String code;
    +	private final String name;
    +	private final Class<T> clazz;
    +	private final TypeComparator<Object>[] comparators;
    +	private final TypeSerializer<T> serializer;
    +
    +	transient private CompositeTypeComparator<T> impl = null;
    +
    +	private void compile() {
    +		try {
    +			assert impl == null;
    +			Class<?> comparatorClazz = InstantiationUtil.compile(clazz.getClassLoader(), name, code);
    +			Constructor<?>[] ctors = comparatorClazz.getConstructors();
    +			assert ctors.length == 1;
    +			impl = (CompositeTypeComparator<T>) ctors[0].newInstance(new Object[]{comparators, serializer, clazz});
    --- End diff --
    
    This is a varargs method, so you don't need to create an array here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77990262
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.Field;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
    +
    +public final class PojoComparatorGenerator<T> {
    +	private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated";
    +
    +	private transient Field[] keyFields;
    +	private transient Integer[] keyFieldIds;
    +	private final TypeComparator<Object>[] comparators;
    +	private final TypeSerializer<T> serializer;
    +	private final Class<T> type;
    +	private final ExecutionConfig config;
    +	private String code;
    +
    +	public PojoComparatorGenerator(Field[] keyFields, TypeComparator<?>[] comparators, TypeSerializer<T> serializer,
    +									Class<T> type, Integer[] keyFieldIds, ExecutionConfig config) {
    +		this.keyFields = keyFields;
    +		this.comparators = (TypeComparator<Object>[]) comparators;
    +
    +		this.type = type;
    +		this.serializer = serializer;
    +		this.keyFieldIds = keyFieldIds;
    +		this.config = config;
    +	}
    +
    +	public TypeComparator<T> createComparator() {
    +		// Multiple comparators can be generated for each type based on a list of keys. The list of keys and the type
    +		// name should determine the generated comparator. This information is used for caching (avoiding
    +		// recompilation). Note that, the name of the field is not sufficient because nested POJOs might have a field
    +		// with the name.
    +		StringBuilder keyBuilder = new StringBuilder();
    +		for(Integer i : keyFieldIds) {
    +			keyBuilder.append(i);
    +			keyBuilder.append("_");
    +		}
    +		final String className = type.getCanonicalName().replace('.', '_') + "_GeneratedComparator" +
    +			keyBuilder.toString();
    +		final String fullClassName = packageName + "." + className;
    +		Class<?> comparatorClazz;
    +		code = InstantiationUtil.getCodeForCachedClass(fullClassName);
    +		if (code == null) {
    +			generateCode(className);
    +		}
    +		if (config.isWrapGeneratedClassesEnabled()) {
    +			return new GenTypeComparatorProxy<>(type, fullClassName, code, comparators, serializer);
    +		}
    +		try {
    +			comparatorClazz = InstantiationUtil.compile(type.getClassLoader(), fullClassName, code);
    +		} catch (Exception e) {
    +			throw new RuntimeException("Unable to generate comparator: " + className, e);
    +		}
    +		Constructor<?>[] ctors = comparatorClazz.getConstructors();
    +		assert ctors.length == 1;
    +		try {
    +			return (TypeComparator<T>) ctors[0].newInstance(new Object[]{comparators, serializer, type});
    --- End diff --
    
    varargs


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77993053
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -419,4 +481,61 @@ public String toString() {
     			return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]";
     		}
     	}
    +
    +	public static String accessStringForField(Field f) {
    +		String fieldName = f.getName();
    +		if (Modifier.isPublic(f.getModifiers())) {
    +			return fieldName;
    +		}
    +		String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
    +		Class parentClazz = f.getDeclaringClass();
    +		try {
    +			parentClazz.getMethod(getterName, new Class[0]);
    +		} catch (NoSuchMethodException e) {
    +			// No getter, it might be a scala class.
    +			return fieldName + "()";
    +		}
    +		return getterName + "()";
    +	}
    +
    +	public static String modifyStringForField(Field f, String arg) {
    +		String fieldName = f.getName();
    +		if (Modifier.isPublic(f.getModifiers())) {
    +			if (f.getType().isPrimitive()) {
    +				return f.getName() + " = (" +
    +					primitiveBoxedClasses.get(f.getType().getCanonicalName()).getCanonicalName() + ")" + arg;
    +			} else {
    +				return f.getName() + " = (" + f.getType().getCanonicalName() + ")" + arg;
    +			}
    +		}
    +		String setterName = "set" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
    +		Class parentClazz = f.getDeclaringClass();
    +		try {
    +			parentClazz.getMethod(setterName, f.getType());
    +		} catch (NoSuchMethodException e) {
    +			// No getter, it might be a scala class.
    --- End diff --
    
    Same question as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by Xazax-hun <gi...@git.apache.org>.
Github user Xazax-hun commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78087704
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.Field;
    +import java.lang.reflect.Modifier;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
    +
    +public final class PojoSerializerGenerator<T> {
    +	private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated";
    +
    +	private final Class<T> clazz;
    +	private final Field[] refFields;
    +	private final TypeSerializer<?>[] fieldSerializers;
    +	private final ExecutionConfig config;
    +	private String code;
    +
    +	public PojoSerializerGenerator(
    +		Class<T> clazz,
    +		TypeSerializer<?>[] fields,
    +		Field[] reflectiveFields,
    +		ExecutionConfig config) {
    +		this.clazz = checkNotNull(clazz);
    +		this.refFields = checkNotNull(reflectiveFields);
    +		this.fieldSerializers = checkNotNull(fields);
    +		this.config = checkNotNull(config);
    +		for (int i = 0; i < this.refFields.length; i++) {
    +			this.refFields[i].setAccessible(true);
    +		}
    +	}
    +
    +	public TypeSerializer<T> createSerializer()  {
    +		final String className = clazz.getCanonicalName().replace('.', '_') + "_GeneratedSerializer";
    +		final String fullClassName = packageName + "." + className;
    +		Class<?> serializerClazz;
    +		code = InstantiationUtil.getCodeForCachedClass(fullClassName);
    +		if (code == null) {
    +			generateCode(className);
    +		}
    +		if(config.isWrapGeneratedClassesEnabled()) {
    +			return new GenTypeSerializerProxy<>(clazz, fullClassName, code, fieldSerializers, config);
    +		}
    +		try {
    +			serializerClazz = InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code);
    +		}
    +		catch (Exception e) {
    +			throw new RuntimeException("Unable to generate serializer: " + className, e);
    +		}
    +		Constructor<?>[] ctors = serializerClazz.getConstructors();
    +		assert ctors.length == 1;
    +		try {
    +			return (TypeSerializer<T>) ctors[0].newInstance(new Object[]{clazz, fieldSerializers, config});
    +		} catch (Exception e) {
    +			throw new RuntimeException("Unable to instantiate serializer: " + className, e);
    +		}
    +
    +	}
    +
    +	private void generateCode(String className) {
    +		assert fieldSerializers.length > 0;
    +		String typeName = clazz.getCanonicalName();
    +		StringBuilder members = new StringBuilder();
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			members.append(String.format("final TypeSerializer f%d;\n", i));
    +		}
    +		StringBuilder initMembers = new StringBuilder();
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			initMembers.append(String.format("f%d = serializerFields[%d];\n", i, i));
    +		}
    +		StringBuilder createFields = new StringBuilder();
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			createFields.append(String.format("((" + typeName + ")t)." + modifyStringForField(refFields[i],
    +				"f%d.createInstance()") + ";\n", i));
    +		}
    +		StringBuilder copyFields = new StringBuilder();
    +		copyFields.append("Object value;\n");
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			if (refFields[i].getType().isPrimitive()) {
    +				copyFields.append(String.format("((" + typeName + ")target)." + modifyStringForField(refFields[i],
    +					"f%d.copy(((" + typeName + ")from)." + accessStringForField(refFields[i])) + ");\n", i));
    +			} else {
    +				copyFields.append(String.format(
    +					"value = ((" + typeName + ")from)." + accessStringForField(refFields[i]) + ";\n" +
    +					"if (value != null) {\n" +
    +					"	((" + typeName + ")target)." + modifyStringForField(refFields[i], "f%d.copy(value)") + ";\n" +
    +					"} else {\n" +
    +					"	((" + typeName + ")target)." + modifyStringForField(refFields[i], "null") + ";\n" +
    +					"}\n", i));
    +			}
    +		}
    +		StringBuilder reuseCopyFields = new StringBuilder();
    +		reuseCopyFields.append("Object value;\n");
    +		reuseCopyFields.append("Object reuseValue;\n");
    +		reuseCopyFields.append("Object copy;\n");
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			if (refFields[i].getType().isPrimitive()) {
    +				reuseCopyFields.append(String.format("((" + typeName + ")reuse)." + modifyStringForField(refFields[i],
    +					"f%d.copy(((" + typeName + ")from)." + accessStringForField(refFields[i]) + ")") + ";\n", i));
    +			} else {
    +				reuseCopyFields.append(String.format(
    +					"value = ((" + typeName + ")from)." + accessStringForField(refFields[i]) + ";\n" +
    +					"if (value != null) {\n" +
    +					"	reuseValue = ((" + typeName + ")reuse)." + accessStringForField(refFields[i]) + ";\n" +
    +					"	if (reuseValue != null) {\n" +
    --- End diff --
    
    I think this is already specialized but I might be wrong about what you mean.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78003840
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.Field;
    +import java.lang.reflect.Modifier;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
    +
    +public final class PojoSerializerGenerator<T> {
    +	private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated";
    +
    +	private final Class<T> clazz;
    +	private final Field[] refFields;
    +	private final TypeSerializer<?>[] fieldSerializers;
    +	private final ExecutionConfig config;
    +	private String code;
    +
    +	public PojoSerializerGenerator(
    +		Class<T> clazz,
    +		TypeSerializer<?>[] fields,
    +		Field[] reflectiveFields,
    +		ExecutionConfig config) {
    +		this.clazz = checkNotNull(clazz);
    +		this.refFields = checkNotNull(reflectiveFields);
    +		this.fieldSerializers = checkNotNull(fields);
    +		this.config = checkNotNull(config);
    +		for (int i = 0; i < this.refFields.length; i++) {
    +			this.refFields[i].setAccessible(true);
    +		}
    +	}
    +
    +	public TypeSerializer<T> createSerializer()  {
    +		final String className = clazz.getCanonicalName().replace('.', '_') + "_GeneratedSerializer";
    +		final String fullClassName = packageName + "." + className;
    +		Class<?> serializerClazz;
    +		code = InstantiationUtil.getCodeForCachedClass(fullClassName);
    +		if (code == null) {
    +			generateCode(className);
    +		}
    +		if(config.isWrapGeneratedClassesEnabled()) {
    +			return new GenTypeSerializerProxy<>(clazz, fullClassName, code, fieldSerializers, config);
    +		}
    +		try {
    +			serializerClazz = InstantiationUtil.compile(clazz.getClassLoader(), fullClassName, code);
    +		}
    +		catch (Exception e) {
    +			throw new RuntimeException("Unable to generate serializer: " + className, e);
    +		}
    +		Constructor<?>[] ctors = serializerClazz.getConstructors();
    +		assert ctors.length == 1;
    +		try {
    +			return (TypeSerializer<T>) ctors[0].newInstance(new Object[]{clazz, fieldSerializers, config});
    +		} catch (Exception e) {
    +			throw new RuntimeException("Unable to instantiate serializer: " + className, e);
    +		}
    +
    +	}
    +
    +	private void generateCode(String className) {
    +		assert fieldSerializers.length > 0;
    +		String typeName = clazz.getCanonicalName();
    +		StringBuilder members = new StringBuilder();
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			members.append(String.format("final TypeSerializer f%d;\n", i));
    +		}
    +		StringBuilder initMembers = new StringBuilder();
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			initMembers.append(String.format("f%d = serializerFields[%d];\n", i, i));
    +		}
    +		StringBuilder createFields = new StringBuilder();
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			createFields.append(String.format("((" + typeName + ")t)." + modifyStringForField(refFields[i],
    +				"f%d.createInstance()") + ";\n", i));
    +		}
    +		StringBuilder copyFields = new StringBuilder();
    +		copyFields.append("Object value;\n");
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			if (refFields[i].getType().isPrimitive()) {
    +				copyFields.append(String.format("((" + typeName + ")target)." + modifyStringForField(refFields[i],
    +					"f%d.copy(((" + typeName + ")from)." + accessStringForField(refFields[i])) + ");\n", i));
    +			} else {
    +				copyFields.append(String.format(
    +					"value = ((" + typeName + ")from)." + accessStringForField(refFields[i]) + ";\n" +
    +					"if (value != null) {\n" +
    +					"	((" + typeName + ")target)." + modifyStringForField(refFields[i], "f%d.copy(value)") + ";\n" +
    +					"} else {\n" +
    +					"	((" + typeName + ")target)." + modifyStringForField(refFields[i], "null") + ";\n" +
    +					"}\n", i));
    +			}
    +		}
    +		StringBuilder reuseCopyFields = new StringBuilder();
    +		reuseCopyFields.append("Object value;\n");
    +		reuseCopyFields.append("Object reuseValue;\n");
    +		reuseCopyFields.append("Object copy;\n");
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			if (refFields[i].getType().isPrimitive()) {
    +				reuseCopyFields.append(String.format("((" + typeName + ")reuse)." + modifyStringForField(refFields[i],
    +					"f%d.copy(((" + typeName + ")from)." + accessStringForField(refFields[i]) + ")") + ";\n", i));
    +			} else {
    +				reuseCopyFields.append(String.format(
    +					"value = ((" + typeName + ")from)." + accessStringForField(refFields[i]) + ";\n" +
    +					"if (value != null) {\n" +
    +					"	reuseValue = ((" + typeName + ")reuse)." + accessStringForField(refFields[i]) + ";\n" +
    +					"	if (reuseValue != null) {\n" +
    --- End diff --
    
    Maybe it's worth it to specialize here for primitive types, since in that case you can omit all these null checks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77987079
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -70,10 +77,41 @@
     	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
     	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
     
    +	private static final Map<Class<?>, Class<? extends TypeSerializer>> customSerializers = new HashMap<>();
    +	private static final Map<Tuple2<ArrayList<Integer>, Class>, Class<? extends TypeComparator>> customComparators =
    +		new HashMap<>();
    +
     	private final PojoField[] fields;
     	
     	private final int totalFields;
     
    +	/**
    +	 * Register a custom serializer for a type. The precedence of the serializers
    +	 * is the following (highest to lowest): Kryo, Avro, Custom, Generated, Flink.
    +	 * The chosen serializer will be the first one from the list that is turned on.
    +	 *
    +	 */
    +	public static <C, S extends TypeSerializer<C>> void registerCustomSerializer(Class<C> clazz, Class<S> ser) {
    +		Constructor<?>[] ctors = ser.getConstructors();
    +		assert ctors.length == 1;
    +		assert ctors[0].getParameterTypes().length == 0;
    --- End diff --
    
    Is it documented somewhere that custom serializers have to have these properties?
    
    Also, these shouldn't be asserts, but throw exceptions instead. (I think asserts are generally for internal consistency stuff, i.e., they should fire only when you have a bug, and not when some stuff that a user gave us is not in the right form.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the issue:

    https://github.com/apache/flink/pull/2211
  
    @Xazax-hun you have not addressed @greghogan's initial comment on enabling code generation by default. I do agree with @greghogan in the sense that we should take the usual, humble road of first introducing and advertising the feature and once it receives positive user feedback and community support make it the default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78153106
  
    --- Diff: flink-core/src/main/resources/PojoComparatorTemplate.ftl ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package ${packageName};
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.util.List;
    +import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.types.NullKeyFieldException;
    +import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase;
    +import org.apache.flink.api.java.typeutils.runtime.GenTypeComparatorProxy;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +public final class ${className} extends CompositeTypeComparator implements java.io.Serializable {
    --- End diff --
    
    OK.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r69951927
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -70,10 +75,21 @@
     	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
     	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
     
    +	private static final Map<Class<?>, Class<? extends TypeSerializer>> customSerializers = new HashMap<>();
    +	private static final Map<Class<?>, Class<? extends TypeComparator>> customComparators = new HashMap<>();
    +
     	private final PojoField[] fields;
     	
     	private final int totalFields;
     
    +	public static <C, S extends TypeSerializer<C>> void registerCustomSerializer(Class<C> c, Class<S> s) {
    --- End diff --
    
    Still needs to be documented (and `registerCustomComparator`, but I wanted to note that it would be good to document the order used in `createSerializer` ... Kyro, Avro, custom, code-generated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...

Posted by Xazax-hun <gi...@git.apache.org>.
Github user Xazax-hun commented on the issue:

    https://github.com/apache/flink/pull/2211
  
    Hi Ufuk!
    I have some good news. With this version the code distribution is a solved problem. I did experience some problem with the checkpointing though, but I think that shouldn't be too hard to solve. I think I can allocate time to address the review comments and probably with some help to fix the problem with the checkpoints. I am not sure, however, that I could allocate time for the next big steps (like implementing more advanced code generation techniques.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...

Posted by PhilippGrulich <gi...@git.apache.org>.
Github user PhilippGrulich commented on the issue:

    https://github.com/apache/flink/pull/2211
  
    Hi,
    
    what is the current status of this?
    Is there still any major blocker?
    
    Best,
    Philipp


---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77992416
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -419,4 +481,61 @@ public String toString() {
     			return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]";
     		}
     	}
    +
    +	public static String accessStringForField(Field f) {
    +		String fieldName = f.getName();
    +		if (Modifier.isPublic(f.getModifiers())) {
    +			return fieldName;
    +		}
    +		String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
    +		Class parentClazz = f.getDeclaringClass();
    --- End diff --
    
    `Class<?>` to avoid the unchecked call to `getMethod` below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r69949156
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
    @@ -99,6 +99,8 @@
     
     	private boolean forceAvro = false;
     
    +	private boolean forceCodeGeneration = true;
    --- End diff --
    
    Standard practice is to leave new features off by default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2211
  
    Ah, thanks for pointing out the dependency on #2094. I wasn't aware of that. 
    Will try to push that PR further than :-).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by greghogan <gi...@git.apache.org>.
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r69951075
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -315,6 +331,23 @@ public int getFieldIndex(String fieldName) {
     			reflectiveFields[i] = fields[i].getField();
     		}
     
    +		if (customSerializers.containsKey(this.getTypeClass())) {
    +			Constructor<?>[] ctors = customSerializers.get(this.getTypeClass()).getConstructors();
    +			assert ctors.length == 1;
    +			try {
    +				return (TypeSerializer<T>) ctors[0].newInstance(new Object[]{getTypeClass(), fieldSerializers, config});
    +
    +			} catch (Throwable t) {
    --- End diff --
    
    Stephan noted to me a little while back that IntelliJ convention for an unused exception is to name the variable `ignored`, so `} catch (Throwable ignored) {`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78154021
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java ---
    @@ -0,0 +1,191 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Field;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
    +
    +public final class PojoComparatorGenerator<T> {
    +	private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated";
    +
    +	private transient Field[] keyFields;
    +	private transient Integer[] keyFieldIds;
    +	private final TypeComparator<?>[] comparators;
    +	private final TypeSerializer<T> serializer;
    +	private final Class<T> type;
    +	private final ExecutionConfig config;
    +	private String code;
    +
    +	public PojoComparatorGenerator(Field[] keyFields, TypeComparator<?>[] comparators, TypeSerializer<T> serializer,
    +									Class<T> type, Integer[] keyFieldIds, ExecutionConfig config) {
    +		this.keyFields = keyFields;
    +		this.comparators = comparators;
    +
    +		this.type = type;
    +		this.serializer = serializer;
    +		this.keyFieldIds = keyFieldIds;
    +		this.config = config;
    +	}
    +
    +	public TypeComparator<T> createComparator() {
    +		// Multiple comparators can be generated for each type based on a list of keys. The list of keys and the type
    +		// name should determine the generated comparator. This information is used for caching (avoiding
    +		// recompilation). Note that, the name of the field is not sufficient because nested POJOs might have a field
    +		// with the name.
    +		StringBuilder keyBuilder = new StringBuilder();
    +		for(Integer i : keyFieldIds) {
    +			keyBuilder.append(i);
    +			keyBuilder.append("_");
    +		}
    +		final String className = type.getCanonicalName().replace('.', '_') + "_GeneratedComparator" +
    +			keyBuilder.toString();
    +		final String fullClassName = packageName + "." + className;
    +		code = InstantiationUtil.getCodeForCachedClass(fullClassName);
    +		if (code == null) {
    +			generateCode(className);
    +		}
    +		return new GenTypeComparatorProxy<>(type, fullClassName, code, comparators, serializer);
    +	}
    +
    +
    +	private void generateCode(String className) {
    +		String typeName = type.getCanonicalName();
    +		StringBuilder members = new StringBuilder();
    +		for (int i = 0; i < comparators.length; ++i) {
    +			members.append(String.format("final TypeComparator f%d;\n", i));
    --- End diff --
    
    same specialization here as in `PojoSerializerGenerator`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77986671
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -70,10 +77,41 @@
     	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
     	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
     
    +	private static final Map<Class<?>, Class<? extends TypeSerializer>> customSerializers = new HashMap<>();
    +	private static final Map<Tuple2<ArrayList<Integer>, Class>, Class<? extends TypeComparator>> customComparators =
    +		new HashMap<>();
    +
     	private final PojoField[] fields;
     	
     	private final int totalFields;
     
    +	/**
    +	 * Register a custom serializer for a type. The precedence of the serializers
    +	 * is the following (highest to lowest): Kryo, Avro, Custom, Generated, Flink.
    +	 * The chosen serializer will be the first one from the list that is turned on.
    +	 *
    +	 */
    +	public static <C, S extends TypeSerializer<C>> void registerCustomSerializer(Class<C> clazz, Class<S> ser) {
    +		Constructor<?>[] ctors = ser.getConstructors();
    +		assert ctors.length == 1;
    +		assert ctors[0].getParameterTypes().length == 0;
    +		customSerializers.put(clazz, ser);
    +	}
    +
    +	/**
    +	 * Register a custom comparator for a type. The precedence of the serializers
    --- End diff --
    
    "serializers" -- You mean comparators?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77991234
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -70,10 +77,41 @@
     	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
     	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
     
    +	private static final Map<Class<?>, Class<? extends TypeSerializer>> customSerializers = new HashMap<>();
    +	private static final Map<Tuple2<ArrayList<Integer>, Class>, Class<? extends TypeComparator>> customComparators =
    --- End diff --
    
    I would add `<?>` to `TypeComparator` to avoid some unchecked casts later on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77990530
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -377,9 +425,11 @@ public void initializeTypeComparatorBuilder(int size) {
     		public void addComparatorField(int fieldId, TypeComparator<?> comparator) {
     			fieldComparators.add(comparator);
     			keyFields.add(fields[fieldId].getField());
    +			keyFieldIds.add(fieldId);
     		}
     
     		@Override
    +		@SuppressWarnings("unchecked")
    --- End diff --
    
    Use `Tuple2.of` to avoid the unchecked cast.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/2211
  
    Hi @Xazax-hun, is the PR still WIP? 
    Can you remove the [WIP] tag from the title if it is good to review? 
    
    Thanks, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on the issue:

    https://github.com/apache/flink/pull/2211
  
    Hi,
    
    There were a number of bugs and painful parts in the code, and unfortunately I'm not really sure whether they are major blockers or easily fixable. It surely wouldn't be an 1-2 day work, but some weeks might be realistic.
    
    As an example from just off the top of my head, we choke in situations where the POJO has a generic argument T, and a field of type T, and we tell Flink with a `.returns` about the generic argument.
    
    There were some other random test failures in older tests, which would require some debugging.
    
    I think we also have the problem that we don't correctly clean up the old generated classes, so they cause a memory leak across Flink jobs.
    
    Another potential issue is that since this PR, there was a new mechanism added to the Flink serializers for dealing with compatibility of serializers (e.g., to ensure that savepoints work across Flink versions), and I'm not sure whether it would be straightforward to update this PR for this. (For example, `PojoSerializer.ensureCompatibility` did not exist back then, and it looks a bit scary at first glance.)
    
    One of the pain points was shipping generated code from the driver program to the TMs. The problem is that if you simply serialize an instance of a generated class in the driver (for shipping the job to the cluster), the TM cannot just simply deserialize it, since the generated class does not exist in that JVM. The good news here is that since this PR I accidentally stumbled upon a new solution to this (using `writeReplace` and `readResolve`), which would be nicer than the current solution of this PR.
    
    Note that there is also this PR for similar code generation for the sorters:
    https://github.com/apache/flink/pull/3511
    This has a lot of similarities, but less pain points, because the sorters are better isolated from the other parts of Flink than the serializers, and also because there we don't have to ship generated code from the driver to the TMs (because the sorters are created on the TMs). So I think if we wanted to push this serializer codegen PR, then first we should start with the sorter codegen PR instead, because it is the easier one of the two. I think that one is actually quite close to being mergable. (Note, that I think we solved the "cleanup between Flink jobs" issue in that PR, and that solution could be adapted to this PR.)
    
    Btw. we are in the planning phase of an MSc thesis with (@mukrram-bajwa), whose work might subsume both of these PRs. The idea is to use Graal to do many of the specializations that are in these PRs. The nice thing would be that the modifications to Flink's code would be much less compared to these PRs, as the magic would happen mostly inside a Graal compilation plugin as custom optimization passes.
    
    Best,
    Gábor


---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77986502
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -70,10 +77,41 @@
     	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
     	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
     
    +	private static final Map<Class<?>, Class<? extends TypeSerializer>> customSerializers = new HashMap<>();
    +	private static final Map<Tuple2<ArrayList<Integer>, Class>, Class<? extends TypeComparator>> customComparators =
    +		new HashMap<>();
    +
     	private final PojoField[] fields;
     	
     	private final int totalFields;
     
    +	/**
    +	 * Register a custom serializer for a type. The precedence of the serializers
    +	 * is the following (highest to lowest): Kryo, Avro, Custom, Generated, Flink.
    +	 * The chosen serializer will be the first one from the list that is turned on.
    --- End diff --
    
    I think the wording "turned on" is a bit confusing, because it means different things for the different elements of this list.
    - I guess for Kryo, Avro, and Generater, you mean `enableForceKryo`, `enableForceAvro`, `enableCodeGeneration` was called?
    - For "Custom", you mean that this method was called for the particular type.
    - For "Flink", you are talking about `PojoSerializer`? This is kind of always turned on. However, it is not always applicable (if the type is not a POJO), in which case we fall back to Kryo, even if it is not "turned on" in the above sense, which is also confusing.
    
    Another problem is that I'm not sure what happens with Tuples (and basic types and other special types) if `enableForceKryo` was called. I guess they don't participate in this mess and always get serialized by their special built-in serializers?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77992481
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -419,4 +481,61 @@ public String toString() {
     			return "NamedFlatFieldDescriptor [name="+fieldName+" position="+getPosition()+" typeInfo="+getType()+"]";
     		}
     	}
    +
    +	public static String accessStringForField(Field f) {
    +		String fieldName = f.getName();
    +		if (Modifier.isPublic(f.getModifiers())) {
    +			return fieldName;
    +		}
    +		String getterName = "get" + Character.toUpperCase(fieldName.charAt(0)) + fieldName.substring(1);
    +		Class parentClazz = f.getDeclaringClass();
    +		try {
    +			parentClazz.getMethod(getterName, new Class[0]);
    --- End diff --
    
    varargs, you can just omit the second arg


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...

Posted by mbalassi <gi...@git.apache.org>.
Github user mbalassi commented on the issue:

    https://github.com/apache/flink/pull/2211
  
    @Xazax-hun, it is geat to see the progress in the code, we are looking forward to the benchmark numbers posted here too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77987122
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -70,10 +77,41 @@
     	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
     	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
     
    +	private static final Map<Class<?>, Class<? extends TypeSerializer>> customSerializers = new HashMap<>();
    +	private static final Map<Tuple2<ArrayList<Integer>, Class>, Class<? extends TypeComparator>> customComparators =
    +		new HashMap<>();
    +
     	private final PojoField[] fields;
     	
     	private final int totalFields;
     
    +	/**
    +	 * Register a custom serializer for a type. The precedence of the serializers
    +	 * is the following (highest to lowest): Kryo, Avro, Custom, Generated, Flink.
    +	 * The chosen serializer will be the first one from the list that is turned on.
    +	 *
    +	 */
    +	public static <C, S extends TypeSerializer<C>> void registerCustomSerializer(Class<C> clazz, Class<S> ser) {
    +		Constructor<?>[] ctors = ser.getConstructors();
    +		assert ctors.length == 1;
    +		assert ctors[0].getParameterTypes().length == 0;
    +		customSerializers.put(clazz, ser);
    +	}
    +
    +	/**
    +	 * Register a custom comparator for a type. The precedence of the serializers
    +	 * is the following (highest to lowest): Custom, Generated, Flink.
    +	 * The chosen serializer will be the first one from the list that is turned on.
    +	 *
    +	 */
    +	public static <S extends TypeComparator> void registerCustomComparator(ArrayList<Integer> keyIds,
    +																			Class clazz, Class<S> comp) {
    +		Constructor<?>[] ctors = comp.getConstructors();
    +		assert ctors.length == 1;
    +		assert ctors[0].getParameterTypes().length == 0;
    --- End diff --
    
    Same problems as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77990200
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenTypeSerializerProxy.java ---
    @@ -0,0 +1,148 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.lang.reflect.Constructor;
    +
    +public class GenTypeSerializerProxy<T> extends TypeSerializer<T> {
    +	private final String code;
    +	private final String name;
    +	private final Class<T> clazz;
    +	private final TypeSerializer<?>[] fieldSerializers;
    +	private final ExecutionConfig config;
    +
    +	transient private TypeSerializer<T> impl = null;
    +
    +	private void compile() {
    +		try {
    +			assert impl == null;
    +			Class<?> serializerClazz = InstantiationUtil.compile(clazz.getClassLoader(), name, code);
    +			Constructor<?>[] ctors = serializerClazz.getConstructors();
    +			assert ctors.length == 1;
    +			impl = (TypeSerializer<T>) ctors[0].newInstance(new Object[]{clazz, fieldSerializers, config});
    --- End diff --
    
    varargs, no need to create an array


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77999725
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoComparatorGenerator.java ---
    @@ -0,0 +1,207 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Constructor;
    +import java.lang.reflect.Field;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
    +
    +public final class PojoComparatorGenerator<T> {
    +	private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated";
    +
    +	private transient Field[] keyFields;
    +	private transient Integer[] keyFieldIds;
    +	private final TypeComparator<Object>[] comparators;
    +	private final TypeSerializer<T> serializer;
    +	private final Class<T> type;
    +	private final ExecutionConfig config;
    +	private String code;
    +
    +	public PojoComparatorGenerator(Field[] keyFields, TypeComparator<?>[] comparators, TypeSerializer<T> serializer,
    +									Class<T> type, Integer[] keyFieldIds, ExecutionConfig config) {
    +		this.keyFields = keyFields;
    +		this.comparators = (TypeComparator<Object>[]) comparators;
    +
    +		this.type = type;
    +		this.serializer = serializer;
    +		this.keyFieldIds = keyFieldIds;
    +		this.config = config;
    +	}
    +
    +	public TypeComparator<T> createComparator() {
    +		// Multiple comparators can be generated for each type based on a list of keys. The list of keys and the type
    +		// name should determine the generated comparator. This information is used for caching (avoiding
    +		// recompilation). Note that, the name of the field is not sufficient because nested POJOs might have a field
    +		// with the name.
    +		StringBuilder keyBuilder = new StringBuilder();
    +		for(Integer i : keyFieldIds) {
    +			keyBuilder.append(i);
    +			keyBuilder.append("_");
    +		}
    +		final String className = type.getCanonicalName().replace('.', '_') + "_GeneratedComparator" +
    +			keyBuilder.toString();
    +		final String fullClassName = packageName + "." + className;
    +		Class<?> comparatorClazz;
    +		code = InstantiationUtil.getCodeForCachedClass(fullClassName);
    +		if (code == null) {
    +			generateCode(className);
    +		}
    +		if (config.isWrapGeneratedClassesEnabled()) {
    +			return new GenTypeComparatorProxy<>(type, fullClassName, code, comparators, serializer);
    +		}
    +		try {
    +			comparatorClazz = InstantiationUtil.compile(type.getClassLoader(), fullClassName, code);
    +		} catch (Exception e) {
    +			throw new RuntimeException("Unable to generate comparator: " + className, e);
    +		}
    +		Constructor<?>[] ctors = comparatorClazz.getConstructors();
    +		assert ctors.length == 1;
    +		try {
    +			return (TypeComparator<T>) ctors[0].newInstance(new Object[]{comparators, serializer, type});
    +		} catch (Exception e) {
    +			throw new RuntimeException("Unable to instantiate comparator using: " + ctors[0].getName(), e);
    +		}
    +	}
    +
    +
    +	private void generateCode(String className) {
    +		String typeName = type.getCanonicalName();
    +		StringBuilder members = new StringBuilder();
    +		for (int i = 0; i < comparators.length; ++i) {
    +			members.append(String.format("final TypeComparator f%d;\n", i));
    +		}
    +		StringBuilder initMembers = new StringBuilder();
    +		for (int i = 0; i < comparators.length; ++i) {
    +			initMembers.append(String.format("f%d = comparators[%d];\n", i, i));
    +		}
    +		StringBuilder normalizableKeys = new StringBuilder();
    +		for (int i = 0; i < comparators.length; ++i) {
    +			normalizableKeys.append(String.format("if (f%d.supportsNormalizedKey()) {\n" +
    +				"	if (f%d.invertNormalizedKey() != inverted) break;\n" +
    +				"	nKeys++;\n" +
    +				"	final int len = f%d.getNormalizeKeyLen();\n" +
    +				"	this.normalizedKeyLengths[%d] = len;\n" +
    +				"	nKeyLen += len;\n" +
    +				"	if (nKeyLen < 0) {\n" +
    +				"		nKeyLen = Integer.MAX_VALUE;\n" +
    +				"		break;\n" +
    +				"	}\n" +
    +				"} else {\n" +
    +				"	break;\n" +
    +				"}\n", i, i, i, i));
    +		}
    +		StringBuilder cloneMembers = new StringBuilder();
    +		for (int i = 0; i < comparators.length; ++i) {
    +			cloneMembers.append(String.format("f%d = toClone.f%d.duplicate();\n", i, i));
    +		}
    +		StringBuilder flatComparators = new StringBuilder();
    +		for (int i = 0; i < comparators.length; ++i) {
    +			flatComparators.append(String.format(
    +				"if(f%d instanceof CompositeTypeComparator) {\n" +
    +				"	((CompositeTypeComparator)f%d).getFlatComparator(flatComparators);\n" +
    +				"} else {\n" +
    +				"	flatComparators.add(f%d);\n" +
    +				"}\n", i, i, i));
    +		}
    +		StringBuilder hashMembers = new StringBuilder();
    +		for (int i = 0; i < keyFields.length; ++i) {
    +			hashMembers.append(String.format(
    +				"code *= TupleComparatorBase.HASH_SALT[%d & 0x1F];\n" +
    +				"code += this.f%d.hash(((" + typeName + ")value)." + accessStringForField(keyFields[i]) +
    +					");\n",
    +				i, i));
    +		}
    +		StringBuilder setReference = new StringBuilder();
    +		for (int i = 0; i < comparators.length; ++i) {
    +			setReference.append(String.format(
    +				"this.f%d.setReference(((" + typeName + ")toCompare)." + accessStringForField(keyFields[i]) + ");\n",
    +				i));
    +		}
    +		StringBuilder equalToReference = new StringBuilder();
    +		for (int i = 0; i < comparators.length; ++i) {
    +			equalToReference.append(String.format(
    +				"if (!this.f%d.equalToReference(((" + typeName + ")candidate)." +
    +				accessStringForField(keyFields[i]) + ")) {\n" +
    +				"	return false;\n" +
    +				"}\n", i));
    +		}
    +		StringBuilder compareToReference = new StringBuilder();
    +		for (int i = 0; i < comparators.length; ++i) {
    +			compareToReference.append(String.format(
    +				"cmp = this.f%d.compareToReference(other.f%d);\n" +
    +				"if (cmp != 0) {\n" +
    +				"	return cmp;\n" +
    +				"}\n", i, i));
    +		}
    +		StringBuilder compareFields = new StringBuilder();
    +		for (int i = 0; i < comparators.length; ++i) {
    +			compareFields.append(String.format(
    +				"cmp = f%d.compare(((" + typeName + ")first)." + accessStringForField(keyFields[i]) + "," +
    +				"((" + typeName + ")second)." + accessStringForField(keyFields[i]) + ");\n" +
    +				"if (cmp != 0) {\n" +
    +					"return cmp;\n" +
    +				"}\n", i));
    +		}
    +		StringBuilder putNormalizedKeys = new StringBuilder();
    +		for (int i = 0; i < comparators.length; ++i) {
    +			putNormalizedKeys.append(String.format("if (%d >= numLeadingNormalizableKeys || numBytes <= 0) break;\n" +
    --- End diff --
    
    Instead of the  `%d >= numLeadingNormalizableKeys` check, you could have the for loop go up to `numLeadingNormalizableKeys`.
    
    Also, why is this a `break` (while false loop in the ftl instead of just returning here?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78775276
  
    --- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl ---
    @@ -0,0 +1,445 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package ${packageName};
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.lang.reflect.Modifier;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedHashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +
    +public final class ${className} extends TypeSerializer {
    +	private static byte IS_NULL = 1;
    +	private static byte NO_SUBCLASS = 2;
    +	private static byte IS_SUBCLASS = 4;
    +	private static byte IS_TAGGED_SUBCLASS = 8;
    +	private int numFields;
    +	private ExecutionConfig executionConfig;
    +	private Map<Class, TypeSerializer> subclassSerializerCache;
    +	private final Map<Class, Integer> registeredClasses;
    +	private final TypeSerializer[] registeredSerializers;
    +	Class clazz;
    +	<#list members as m>
    +	${m}
    +	</#list>
    +
    +	public ${className}(Class clazz, TypeSerializer[] serializerFields, ExecutionConfig e) {
    +		this.clazz = clazz;
    +		executionConfig = e;
    +		this.numFields = serializerFields.length;
    +		LinkedHashSet<Class> registeredPojoTypes = executionConfig.getRegisteredPojoTypes();
    +		subclassSerializerCache = new HashMap<Class, TypeSerializer>();
    +		List<Class> cleanedTaggedClasses = new ArrayList<Class>(registeredPojoTypes.size());
    +		for (Class registeredClass: registeredPojoTypes) {
    +			if (registeredClass.equals(clazz)) {
    +				continue;
    +			}
    +			if (!clazz.isAssignableFrom(registeredClass)) {
    +				continue;
    +			}
    +			cleanedTaggedClasses.add(registeredClass);
    +		}
    +		this.registeredClasses = new LinkedHashMap<Class, Integer>(cleanedTaggedClasses.size());
    +		registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()];
    +		int id = 0;
    +		for (Class registeredClass: cleanedTaggedClasses) {
    +			this.registeredClasses.put(registeredClass, id);
    +			TypeInformation typeInfo = TypeExtractor.createTypeInfo(registeredClass);
    +			registeredSerializers[id] = typeInfo.createSerializer(executionConfig);
    +			id++;
    +		}
    +		<#list initMembers as m>
    +		${m}
    +		</#list>
    +	}
    +
    +	private TypeSerializer getSubclassSerializer(Class subclass) {
    +		TypeSerializer result = (TypeSerializer)subclassSerializerCache.get(subclass);
    +		if (result == null) {
    +			TypeInformation typeInfo = TypeExtractor.createTypeInfo(subclass);
    +			result = typeInfo.createSerializer(executionConfig);
    +			subclassSerializerCache.put(subclass, result);
    +		}
    +		return result;
    +	}
    +
    +	@Override
    +	public boolean isImmutableType() { return false; }
    +
    +	@Override
    +	public ${className} duplicate() {
    +		boolean stateful = false;
    +		TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[numFields];
    +		<#list duplicateSerializers as ds>
    +		${ds}
    +		</#list>
    +		if (stateful) {
    +			return new ${className}(clazz, duplicateFieldSerializers, executionConfig);
    +		} else {
    +			return this;
    +		}
    +	}
    +
    +	@Override
    +	public ${typeName} createInstance() {
    +		<#if alwaysNull == "true">
    +		return null;
    +		</#if>
    +		<#if alwaysNull != "true">
    +		${typeName} t = new ${typeName}();
    +		initializeFields(t);
    +		return t;
    +		</#if>
    +	}
    +
    +	protected void initializeFields(${typeName} t) {
    +		<#list createFields as cf>
    +		${cf}
    +		</#list>
    +	}
    +
    +	@Override
    +	public int getLength() {  return -1; } // TODO: make it smarter based on annotations?
    +
    +	<#if isFinal == "true">
    +	@Override
    +	public ${typeName} copy(Object from) {
    +		if (from == null) return null;
    +		<#if alwaysNull == "true">
    +		${typeName} target = null;
    +		</#if>
    +		<#if alwaysNull != "true">
    +		${typeName} target = new ${typeName}();
    --- End diff --
    
    Use `createInstance` here, because of `alwaysNull`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78138683
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerGenerator.java ---
    @@ -0,0 +1,242 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.api.java.typeutils.runtime;
    +
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +
    +import org.apache.flink.util.InstantiationUtil;
    +
    +import java.io.IOException;
    +import java.lang.reflect.Field;
    +import java.lang.reflect.Modifier;
    +import java.util.HashMap;
    +import java.util.Map;
    +
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.accessStringForField;
    +import static org.apache.flink.api.java.typeutils.PojoTypeInfo.modifyStringForField;
    +
    +public final class PojoSerializerGenerator<T> {
    +	private static final String packageName = "org.apache.flink.api.java.typeutils.runtime.generated";
    +
    +	private final Class<T> clazz;
    +	private final Field[] refFields;
    +	private final TypeSerializer<?>[] fieldSerializers;
    +	private final ExecutionConfig config;
    +	private String code;
    +
    +	public PojoSerializerGenerator(
    +		Class<T> clazz,
    +		TypeSerializer<?>[] fields,
    +		Field[] reflectiveFields,
    +		ExecutionConfig config) {
    +		this.clazz = checkNotNull(clazz);
    +		this.refFields = checkNotNull(reflectiveFields);
    +		this.fieldSerializers = checkNotNull(fields);
    +		this.config = checkNotNull(config);
    +		for (int i = 0; i < this.refFields.length; i++) {
    +			this.refFields[i].setAccessible(true);
    +		}
    +	}
    +
    +	public TypeSerializer<T> createSerializer()  {
    +		final String className = clazz.getCanonicalName().replace('.', '_') + "_GeneratedSerializer";
    +		final String fullClassName = packageName + "." + className;
    +		code = InstantiationUtil.getCodeForCachedClass(fullClassName);
    +		if (code == null) {
    +			generateCode(className);
    +		}
    +		return new GenTypeSerializerProxy<>(clazz, fullClassName, code, fieldSerializers, config);
    +	}
    +
    +	private void generateCode(String className) {
    +		assert fieldSerializers.length > 0;
    +		String typeName = clazz.getCanonicalName();
    +		StringBuilder members = new StringBuilder();
    +		for (int i = 0; i < fieldSerializers.length; ++i) {
    +			members.append(String.format("final TypeSerializer f%d;\n", i));
    --- End diff --
    
    You could specialize the type here to the appropriate descendant of `TypeSerializer`. This will very likely make method calls on `f%d` faster, because of better devirtualization.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...

Posted by Xazax-hun <gi...@git.apache.org>.
Github user Xazax-hun commented on the issue:

    https://github.com/apache/flink/pull/2211
  
    Hi!
    This patch depends on the following pull request: https://github.com/apache/flink/pull/2094
    Once it is landed I will remove the [WIP] tag. I did not remove it yet because I did not want the reviewer to review changes that was not done by me. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2211: [WIP][FLINK-3599] Code generation for PojoSerializer and ...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2211
  
    Cool proposal! I didn't look at the code, but have a more general question: Since this was done as part of a now finished GSoC, I'm wondering what your plans are (given that this part is reviewed etc.) for the 2nd part of the implementation (distributing the code, etc.). Do you think you will have time to get to that part or is it unrealistic given that GSoC is over?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78002589
  
    --- Diff: flink-core/src/main/resources/PojoSerializerTemplate.ftl ---
    @@ -0,0 +1,372 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package ${packageName};
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.lang.reflect.Modifier;
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.LinkedHashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Objects;
    +import org.apache.flink.api.common.ExecutionConfig;
    +import org.apache.flink.api.common.typeinfo.TypeInformation;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.java.typeutils.TypeExtractor;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +
    +public final class ${className} extends TypeSerializer {
    +	private static byte IS_NULL = 1;
    +	private static byte NO_SUBCLASS = 2;
    +	private static byte IS_SUBCLASS = 4;
    +	private static byte IS_TAGGED_SUBCLASS = 8;
    +	private int numFields;
    +	private ExecutionConfig executionConfig;
    +	private Map<Class, TypeSerializer> subclassSerializerCache;
    +	private final Map<Class, Integer> registeredClasses;
    +	private final TypeSerializer[] registeredSerializers;
    +	Class clazz;
    +	<#list members as m>
    +	${m}
    +	</#list>
    +	public ${className}(Class clazz, TypeSerializer[] serializerFields, ExecutionConfig e) {
    +		this.clazz = clazz;
    +		executionConfig = e;
    +		this.numFields = serializerFields.length;
    +		LinkedHashSet<Class> registeredPojoTypes = executionConfig.getRegisteredPojoTypes();
    +		subclassSerializerCache = new HashMap<Class, TypeSerializer>();
    +		List<Class> cleanedTaggedClasses = new ArrayList<Class>(registeredPojoTypes.size());
    +		for (Class registeredClass: registeredPojoTypes) {
    +			if (registeredClass.equals(clazz)) {
    +				continue;
    +			}
    +			if (!clazz.isAssignableFrom(registeredClass)) {
    +				continue;
    +			}
    +			cleanedTaggedClasses.add(registeredClass);
    +		}
    +		this.registeredClasses = new LinkedHashMap<Class, Integer>(cleanedTaggedClasses.size());
    +		registeredSerializers = new TypeSerializer[cleanedTaggedClasses.size()];
    +		int id = 0;
    +		for (Class registeredClass: cleanedTaggedClasses) {
    +			this.registeredClasses.put(registeredClass, id);
    +			TypeInformation typeInfo = TypeExtractor.createTypeInfo(registeredClass);
    +			registeredSerializers[id] = typeInfo.createSerializer(executionConfig);
    +			id++;
    +		}
    +		<#list initMembers as m>
    +		${m}
    +		</#list>
    +	}
    +	private TypeSerializer getSubclassSerializer(Class subclass) {
    +		TypeSerializer result = (TypeSerializer)subclassSerializerCache.get(subclass);
    +		if (result == null) {
    +			TypeInformation typeInfo = TypeExtractor.createTypeInfo(subclass);
    +			result = typeInfo.createSerializer(executionConfig);
    +			subclassSerializerCache.put(subclass, result);
    +		}
    +		return result;
    +	}
    +	public boolean isImmutableType() { return false; }
    +	public ${className} duplicate() {
    +		boolean stateful = false;
    +		TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[numFields];
    +		<#list duplicateSerializers as ds>
    +		${ds}
    +		</#list>
    +		if (stateful) {
    +			return new ${className}(clazz, duplicateFieldSerializers, executionConfig);
    +		} else {
    +			return this;
    +		}
    +	}
    +	public ${typeName} createInstance() {
    +		if (clazz.isInterface() || Modifier.isAbstract(clazz.getModifiers())) {
    +			return null;
    +		}
    +		try {
    +			${typeName} t = (${typeName})clazz.newInstance();
    +			initializeFields(t);
    +			return t;
    +		}
    +		catch (Exception e) {
    +			throw new RuntimeException("Cannot instantiate class.", e);
    +		}
    +	}
    +	protected void initializeFields(${typeName} t) {
    +		<#list createFields as cf>
    +		${cf}
    +		</#list>
    +	}
    +	public ${typeName} copy(Object from) {
    +		if (from == null) return null;
    +		Class<?> actualType = from.getClass();
    +		${typeName} target;
    +		if (actualType == clazz) {
    +			try {
    +				target = (${typeName}) from.getClass().newInstance();
    --- End diff --
    
    You can also use `new` here, avoiding reflection (since `actualType == clazz`).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77987928
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java ---
    @@ -70,10 +77,41 @@
     	private static final Pattern PATTERN_NESTED_FIELDS = Pattern.compile(REGEX_NESTED_FIELDS);
     	private static final Pattern PATTERN_NESTED_FIELDS_WILDCARD = Pattern.compile(REGEX_NESTED_FIELDS_WILDCARD);
     
    +	private static final Map<Class<?>, Class<? extends TypeSerializer>> customSerializers = new HashMap<>();
    +	private static final Map<Tuple2<ArrayList<Integer>, Class>, Class<? extends TypeComparator>> customComparators =
    +		new HashMap<>();
    +
     	private final PojoField[] fields;
     	
     	private final int totalFields;
     
    +	/**
    +	 * Register a custom serializer for a type. The precedence of the serializers
    +	 * is the following (highest to lowest): Kryo, Avro, Custom, Generated, Flink.
    +	 * The chosen serializer will be the first one from the list that is turned on.
    --- End diff --
    
    I'm also not sure about putting "Custom" after "Kryo" and "Avro". The reason for having a custom serializer is kind of that I want to bypass all this mess. Or to reason from an other angle, why would I have a custom serializer, if it doesn't work, and Kryo has to take over?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r78000250
  
    --- Diff: flink-core/src/main/resources/PojoComparatorTemplate.ftl ---
    @@ -0,0 +1,195 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *	 http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package ${packageName};
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.util.List;
    +import org.apache.flink.api.common.typeutils.CompositeTypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeComparator;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.MemorySegment;
    +import org.apache.flink.types.NullKeyFieldException;
    +import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase;
    +import org.apache.flink.api.java.typeutils.runtime.GenTypeComparatorProxy;
    +import org.apache.flink.util.InstantiationUtil;
    +
    +public final class ${className} extends CompositeTypeComparator implements java.io.Serializable {
    --- End diff --
    
    Maybe add `<${className}>` as generic argument to `CompositeTypeComparator`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2211: [WIP][FLINK-3599] Code generation for PojoSerializ...

Posted by ggevay <gi...@git.apache.org>.
Github user ggevay commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2211#discussion_r77983346
  
    --- Diff: flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---
    @@ -455,6 +459,33 @@ public boolean isForceAvroEnabled() {
     		return forceAvro;
     	}
     
    +    /**
    +     * Force Flink to use the generated serializers for POJOs.
    --- End diff --
    
    Update comment to include comparators.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---