You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flavio Pompermaier (JIRA)" <ji...@apache.org> on 2016/09/30 09:29:20 UTC

[jira] [Commented] (FLINK-4719) KryoSerializer random exception

    [ https://issues.apache.org/jira/browse/FLINK-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535535#comment-15535535 ] 

Flavio Pompermaier commented on FLINK-4719:
-------------------------------------------

Using the Flink 1.1.1 code my job fails frequently, instead using the following code for the KryoSerializer decrease a lot the frequency of such Exception.
I hope this could help in solving the problem:

{code:java}
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.api.java.typeutils.runtime.kryo;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;

import org.apache.avro.generic.GenericData;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

import org.objenesis.strategy.StdInstantiatorStrategy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
 * A type serializer that serializes its type using the Kryo serialization
 * framework (https://github.com/EsotericSoftware/kryo).
 * 
 * This serializer is intended as a fallback serializer for the cases that are
 * not covered by the basic types, tuples, and POJOs.
 *
 * @param <T> The type to be serialized.
 */
public class KryoSerializer<T> extends TypeSerializer<T> {

	private static final long serialVersionUID = 3L;

	private static final Logger LOG = LoggerFactory.getLogger(KryoSerializer.class);

	// ------------------------------------------------------------------------

	private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses;
	private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
	private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses;
	private final LinkedHashSet<Class<?>> registeredTypes;

	private final Class<T> type;
	
	// ------------------------------------------------------------------------
	// The fields below are lazily initialized after duplication or deserialization.

	private transient Kryo kryo;
	private transient T copyInstance;
	
	// ------------------------------------------------------------------------

	public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
		this.type = checkNotNull(type);

		this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
		this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses();
		this.registeredTypesWithSerializers = executionConfig.getRegisteredTypesWithKryoSerializers();
		this.registeredTypesWithSerializerClasses = executionConfig.getRegisteredTypesWithKryoSerializerClasses();
		this.registeredTypes = executionConfig.getRegisteredKryoTypes();
	}

	/**
	 * Copy-constructor that does not copy transient fields. They will be initialized once required.
	 */
	protected KryoSerializer(KryoSerializer<T> toCopy) {
		registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers;
		registeredTypesWithSerializerClasses = toCopy.registeredTypesWithSerializerClasses;
		defaultSerializers = toCopy.defaultSerializers;
		defaultSerializerClasses = toCopy.defaultSerializerClasses;
		registeredTypes = toCopy.registeredTypes;

		type = toCopy.type;
		if(type == null){
			throw new NullPointerException("Type class cannot be null.");
		}
	}

	// ------------------------------------------------------------------------

	@Override
	public boolean isImmutableType() {
		return false;
	}

	@Override
	public KryoSerializer<T> duplicate() {
		return new KryoSerializer<T>(this);
	}

	@Override
	public T createInstance() {
		if(Modifier.isAbstract(type.getModifiers()) || Modifier.isInterface(type.getModifiers()) ) {
			return null;
		} else {
			checkKryoInitialized();
			try {
				return kryo.newInstance(type);
			} catch(Throwable e) {
				return null;
			}
		}
	}

	@SuppressWarnings("unchecked")
	@Override
	public T copy(T from) {
		if (from == null) {
			return null;
		}
		checkKryoInitialized();
		try {
			return kryo.copy(from);
		}
		catch(KryoException ke) {
			// kryo was unable to copy it, so we do it through serialization:
			ByteArrayOutputStream baout = new ByteArrayOutputStream();
			Output output = new Output(baout);

			kryo.writeObject(output, from);

			output.close();

			ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
			Input input = new Input(bain);
			T ret = (T)kryo.readObject(input, from.getClass());
			
			input.close();

			return ret;
		}
	}
	
	@Override
	public T copy(T from, T reuse) {
		return copy(from);
	}

	@Override
	public int getLength() {
		return -1;
	}

	@Override
	public void serialize(T record, DataOutputView target) throws IOException {
		checkKryoInitialized();
		DataOutputViewStream outputStream = new DataOutputViewStream(target);
		Output output = new Output(outputStream);

		try {
			kryo.writeClassAndObject(output, record);
		}
		catch (KryoException ke) {
			Throwable cause = ke.getCause();
			if (cause instanceof EOFException) {
				throw (EOFException) cause;
			}
			else {
				throw ke;
			}
		} finally {
				try{
					output.close();
				} catch (KryoException ke) {
						Throwable cause = ke.getCause();

						if (cause instanceof EOFException) {
							throw (EOFException) cause;
						} else {
								throw ke;
						}
				}
		}
	}

	@SuppressWarnings("unchecked")
	@Override
	public T deserialize(DataInputView source) throws IOException {
		checkKryoInitialized();
		DataInputViewStream inputStream = new DataInputViewStream(source);
		Input input = new NoFetchingInput(inputStream);

		try {
			return (T) kryo.readClassAndObject(input);
		} catch (KryoException ke) {
			Throwable cause = ke.getCause();

			if (cause instanceof EOFException) {
				throw (EOFException) cause;
			} else {
				throw ke;
			}
		} finally {
			try{
				input.close();
			} catch (KryoException ke) {
					Throwable cause = ke.getCause();

					if (cause instanceof EOFException) {
						throw (EOFException) cause;
					} else {
							throw ke;
					}
				}
		}
	}
	
	@Override
	public T deserialize(T reuse, DataInputView source) throws IOException {
		return deserialize(source);
	}

	@Override
	public void copy(DataInputView source, DataOutputView target) throws IOException {
		checkKryoInitialized();
		if(this.copyInstance == null){
			this.copyInstance = createInstance();
		}

		T tmp = deserialize(copyInstance, source);
		serialize(tmp, target);
	}
	
	// --------------------------------------------------------------------------------------------
	
	@Override
	public int hashCode() {
		return Objects.hash(
			type,
			registeredTypes,
			registeredTypesWithSerializerClasses,
			defaultSerializerClasses);
	}
	
	@Override
	public boolean equals(Object obj) {
		if (obj instanceof KryoSerializer) {
			KryoSerializer<?> other = (KryoSerializer<?>) obj;

			// we cannot include the Serializers here because they don't implement the equals method
			return other.canEqual(this) &&
				type == other.type &&
				registeredTypes.equals(other.registeredTypes) &&
				registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) &&
				defaultSerializerClasses.equals(other.defaultSerializerClasses);
		} else {
			return false;
		}
	}

	@Override
	public boolean canEqual(Object obj) {
		return obj instanceof KryoSerializer;
	}

	// --------------------------------------------------------------------------------------------

	/**
	 * Returns the Chill Kryo Serializer which is implictly added to the classpath via flink-runtime.
	 * Falls back to the default Kryo serializer if it can't be found.
	 * @return The Kryo serializer instance.
	 */
	private Kryo getKryoInstance() {

		try {
			// check if ScalaKryoInstantiator is in class path (coming from Twitter's Chill library).
			// This will be true if Flink's Scala API is used.
			Class<?> chillInstantiatorClazz = Class.forName("com.twitter.chill.ScalaKryoInstantiator");
			Object chillInstantiator = chillInstantiatorClazz.newInstance();

			// obtain a Kryo instance through Twitter Chill
			Method m = chillInstantiatorClazz.getMethod("newKryo");

			return (Kryo) m.invoke(chillInstantiator);
		} catch (ClassNotFoundException | InstantiationException | NoSuchMethodException |
			IllegalAccessException | InvocationTargetException e) {

			LOG.warn("Falling back to default Kryo serializer because Chill serializer couldn't be found.", e);

			Kryo.DefaultInstantiatorStrategy initStrategy = new Kryo.DefaultInstantiatorStrategy();
			initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());

			Kryo kryo = new Kryo();
			kryo.setInstantiatorStrategy(initStrategy);

			return kryo;
		}
	}

	private void checkKryoInitialized() {
		if (this.kryo == null) {
			this.kryo = getKryoInstance();

			// Enable reference tracking. 
			kryo.setReferences(true);
			
			// Throwable and all subclasses should be serialized via java serialization
			kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());

			// Add default serializers first, so that they type registrations without a serializer
			// are registered with a default serializer
			for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: defaultSerializers.entrySet()) {
				kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
			}

			for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry: defaultSerializerClasses.entrySet()) {
				kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
			}

			// register the type of our class
			kryo.register(type);

			// register given types. we do this first so that any registration of a
			// more specific serializer overrides this
			for (Class<?> type : registeredTypes) {
				kryo.register(type);
			}

			// register given serializer classes
			for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) {
				Class<?> typeClass = e.getKey();
				Class<? extends Serializer<?>> serializerClass = e.getValue();

				Serializer<?> serializer =
						ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
				kryo.register(typeClass, serializer);
			}

			// register given serializers
			for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : registeredTypesWithSerializers.entrySet()) {
				kryo.register(e.getKey(), e.getValue().getSerializer());
			}
			// this is needed for Avro but can not be added on demand.
			kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializerForArrayList());

			kryo.setRegistrationRequired(false);
			kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
		}
	}

	// --------------------------------------------------------------------------------------------
	// For testing
	// --------------------------------------------------------------------------------------------
	
	public Kryo getKryo() {
		checkKryoInitialized();
		return this.kryo;
	}
}
{code}

> KryoSerializer random exception
> -------------------------------
>
>                 Key: FLINK-4719
>                 URL: https://issues.apache.org/jira/browse/FLINK-4719
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.1.1
>            Reporter: Flavio Pompermaier
>              Labels: kryo, serialization
>
> There's a random exception that involves somehow the KryoSerializer when using POJOs in Flink jobs reading large volumes of data.
> It is usually thrown in several places, e.g. (the Exceptions reported here can refer to previous versions of Flink...):
> {code}
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: java.ttil.HashSet
>         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
>         at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: java.ttil.HashSet
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>         at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
>         at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
>         ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: java.ttil.HashSet
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: java.ttil.HashSet
>         at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>         at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
>         at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
>         at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
>         at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
>         at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>         at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:348)
>         at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> {code}
> {code}
> Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
>     at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
>     at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>     at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>     at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>     at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>     at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
>     at java.util.ArrayList.elementData(ArrayList.java:418)
>     at java.util.ArrayList.get(ArrayList.java:431)
>     at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
>     at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>     at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
>     at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>     at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>     at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> {code}
> {code}
> java.lang.RuntimeException: Cannot instantiate class.
> 	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
> 	at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> 	at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> 	at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> 	at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> 	at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> 	at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
> 	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> 	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: it.okkam.flink.test.model.pojo.VdhicleEvent
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Class.java:348)
> 	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
> 	... 10 more
> {code}
> {code}
> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
>         at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
>         at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
>         at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
>         at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>         at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
>         at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
>         at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
>         at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
>         at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
>         at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
>         at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
> {code}
> {code}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> 	at java.util.ArrayList.elementData(ArrayList.java:418)
> 	at java.util.ArrayList.get(ArrayList.java:431)
> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
> 	at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> 	at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
> 	at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
> 	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
> 	at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> 	at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
> 	at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
> 	at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
> 	at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:45)
> 	at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130)
> 	at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
> 	at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
> 	at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:64)
> 	at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
> 	at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> 	at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> 	at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)