You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Flavio Pompermaier (JIRA)" <ji...@apache.org> on 2016/09/30 09:29:20 UTC
[jira] [Commented] (FLINK-4719) KryoSerializer random exception
[ https://issues.apache.org/jira/browse/FLINK-4719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535535#comment-15535535 ]
Flavio Pompermaier commented on FLINK-4719:
-------------------------------------------
Using the Flink 1.1.1 code my job fails frequently, instead using the following code for the KryoSerializer decrease a lot the frequency of such Exception.
I hope this could help in solving the problem:
{code:java}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.api.java.typeutils.runtime.kryo;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import org.apache.avro.generic.GenericData;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream;
import org.apache.flink.api.java.typeutils.runtime.NoFetchingInput;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers.SpecificInstanceCollectionSerializerForArrayList;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Objects;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A type serializer that serializes its type using the Kryo serialization
* framework (https://github.com/EsotericSoftware/kryo).
*
* This serializer is intended as a fallback serializer for the cases that are
* not covered by the basic types, tuples, and POJOs.
*
* @param <T> The type to be serialized.
*/
public class KryoSerializer<T> extends TypeSerializer<T> {
private static final long serialVersionUID = 3L;
private static final Logger LOG = LoggerFactory.getLogger(KryoSerializer.class);
// ------------------------------------------------------------------------
private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> registeredTypesWithSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithSerializerClasses;
private final LinkedHashMap<Class<?>, ExecutionConfig.SerializableSerializer<?>> defaultSerializers;
private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultSerializerClasses;
private final LinkedHashSet<Class<?>> registeredTypes;
private final Class<T> type;
// ------------------------------------------------------------------------
// The fields below are lazily initialized after duplication or deserialization.
private transient Kryo kryo;
private transient T copyInstance;
// ------------------------------------------------------------------------
public KryoSerializer(Class<T> type, ExecutionConfig executionConfig){
this.type = checkNotNull(type);
this.defaultSerializers = executionConfig.getDefaultKryoSerializers();
this.defaultSerializerClasses = executionConfig.getDefaultKryoSerializerClasses();
this.registeredTypesWithSerializers = executionConfig.getRegisteredTypesWithKryoSerializers();
this.registeredTypesWithSerializerClasses = executionConfig.getRegisteredTypesWithKryoSerializerClasses();
this.registeredTypes = executionConfig.getRegisteredKryoTypes();
}
/**
* Copy-constructor that does not copy transient fields. They will be initialized once required.
*/
protected KryoSerializer(KryoSerializer<T> toCopy) {
registeredTypesWithSerializers = toCopy.registeredTypesWithSerializers;
registeredTypesWithSerializerClasses = toCopy.registeredTypesWithSerializerClasses;
defaultSerializers = toCopy.defaultSerializers;
defaultSerializerClasses = toCopy.defaultSerializerClasses;
registeredTypes = toCopy.registeredTypes;
type = toCopy.type;
if(type == null){
throw new NullPointerException("Type class cannot be null.");
}
}
// ------------------------------------------------------------------------
@Override
public boolean isImmutableType() {
return false;
}
@Override
public KryoSerializer<T> duplicate() {
return new KryoSerializer<T>(this);
}
@Override
public T createInstance() {
if(Modifier.isAbstract(type.getModifiers()) || Modifier.isInterface(type.getModifiers()) ) {
return null;
} else {
checkKryoInitialized();
try {
return kryo.newInstance(type);
} catch(Throwable e) {
return null;
}
}
}
@SuppressWarnings("unchecked")
@Override
public T copy(T from) {
if (from == null) {
return null;
}
checkKryoInitialized();
try {
return kryo.copy(from);
}
catch(KryoException ke) {
// kryo was unable to copy it, so we do it through serialization:
ByteArrayOutputStream baout = new ByteArrayOutputStream();
Output output = new Output(baout);
kryo.writeObject(output, from);
output.close();
ByteArrayInputStream bain = new ByteArrayInputStream(baout.toByteArray());
Input input = new Input(bain);
T ret = (T)kryo.readObject(input, from.getClass());
input.close();
return ret;
}
}
@Override
public T copy(T from, T reuse) {
return copy(from);
}
@Override
public int getLength() {
return -1;
}
@Override
public void serialize(T record, DataOutputView target) throws IOException {
checkKryoInitialized();
DataOutputViewStream outputStream = new DataOutputViewStream(target);
Output output = new Output(outputStream);
try {
kryo.writeClassAndObject(output, record);
}
catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
}
else {
throw ke;
}
} finally {
try{
output.close();
} catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw ke;
}
}
}
}
@SuppressWarnings("unchecked")
@Override
public T deserialize(DataInputView source) throws IOException {
checkKryoInitialized();
DataInputViewStream inputStream = new DataInputViewStream(source);
Input input = new NoFetchingInput(inputStream);
try {
return (T) kryo.readClassAndObject(input);
} catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw ke;
}
} finally {
try{
input.close();
} catch (KryoException ke) {
Throwable cause = ke.getCause();
if (cause instanceof EOFException) {
throw (EOFException) cause;
} else {
throw ke;
}
}
}
}
@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
checkKryoInitialized();
if(this.copyInstance == null){
this.copyInstance = createInstance();
}
T tmp = deserialize(copyInstance, source);
serialize(tmp, target);
}
// --------------------------------------------------------------------------------------------
@Override
public int hashCode() {
return Objects.hash(
type,
registeredTypes,
registeredTypesWithSerializerClasses,
defaultSerializerClasses);
}
@Override
public boolean equals(Object obj) {
if (obj instanceof KryoSerializer) {
KryoSerializer<?> other = (KryoSerializer<?>) obj;
// we cannot include the Serializers here because they don't implement the equals method
return other.canEqual(this) &&
type == other.type &&
registeredTypes.equals(other.registeredTypes) &&
registeredTypesWithSerializerClasses.equals(other.registeredTypesWithSerializerClasses) &&
defaultSerializerClasses.equals(other.defaultSerializerClasses);
} else {
return false;
}
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof KryoSerializer;
}
// --------------------------------------------------------------------------------------------
/**
* Returns the Chill Kryo Serializer which is implictly added to the classpath via flink-runtime.
* Falls back to the default Kryo serializer if it can't be found.
* @return The Kryo serializer instance.
*/
private Kryo getKryoInstance() {
try {
// check if ScalaKryoInstantiator is in class path (coming from Twitter's Chill library).
// This will be true if Flink's Scala API is used.
Class<?> chillInstantiatorClazz = Class.forName("com.twitter.chill.ScalaKryoInstantiator");
Object chillInstantiator = chillInstantiatorClazz.newInstance();
// obtain a Kryo instance through Twitter Chill
Method m = chillInstantiatorClazz.getMethod("newKryo");
return (Kryo) m.invoke(chillInstantiator);
} catch (ClassNotFoundException | InstantiationException | NoSuchMethodException |
IllegalAccessException | InvocationTargetException e) {
LOG.warn("Falling back to default Kryo serializer because Chill serializer couldn't be found.", e);
Kryo.DefaultInstantiatorStrategy initStrategy = new Kryo.DefaultInstantiatorStrategy();
initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
Kryo kryo = new Kryo();
kryo.setInstantiatorStrategy(initStrategy);
return kryo;
}
}
private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = getKryoInstance();
// Enable reference tracking.
kryo.setReferences(true);
// Throwable and all subclasses should be serialized via java serialization
kryo.addDefaultSerializer(Throwable.class, new JavaSerializer());
// Add default serializers first, so that they type registrations without a serializer
// are registered with a default serializer
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> entry: defaultSerializers.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue().getSerializer());
}
for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> entry: defaultSerializerClasses.entrySet()) {
kryo.addDefaultSerializer(entry.getKey(), entry.getValue());
}
// register the type of our class
kryo.register(type);
// register given types. we do this first so that any registration of a
// more specific serializer overrides this
for (Class<?> type : registeredTypes) {
kryo.register(type);
}
// register given serializer classes
for (Map.Entry<Class<?>, Class<? extends Serializer<?>>> e : registeredTypesWithSerializerClasses.entrySet()) {
Class<?> typeClass = e.getKey();
Class<? extends Serializer<?>> serializerClass = e.getValue();
Serializer<?> serializer =
ReflectionSerializerFactory.makeSerializer(kryo, serializerClass, typeClass);
kryo.register(typeClass, serializer);
}
// register given serializers
for (Map.Entry<Class<?>, ExecutionConfig.SerializableSerializer<?>> e : registeredTypesWithSerializers.entrySet()) {
kryo.register(e.getKey(), e.getValue().getSerializer());
}
// this is needed for Avro but can not be added on demand.
kryo.register(GenericData.Array.class, new SpecificInstanceCollectionSerializerForArrayList());
kryo.setRegistrationRequired(false);
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
}
}
// --------------------------------------------------------------------------------------------
// For testing
// --------------------------------------------------------------------------------------------
public Kryo getKryo() {
checkKryoInitialized();
return this.kryo;
}
}
{code}
> KryoSerializer random exception
> -------------------------------
>
> Key: FLINK-4719
> URL: https://issues.apache.org/jira/browse/FLINK-4719
> Project: Flink
> Issue Type: Bug
> Components: Core
> Affects Versions: 1.1.1
> Reporter: Flavio Pompermaier
> Labels: kryo, serialization
>
> There's a random exception that involves somehow the KryoSerializer when using POJOs in Flink jobs reading large volumes of data.
> It is usually thrown in several places, e.g. (the Exceptions reported here can refer to previous versions of Flink...):
> {code}
> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce (GroupReduce at createResult(IndexMappingExecutor.java:42)) -> Map (Map at main(Jsonizer.java:128))' , caused an error: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: java.ttil.HashSet
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:456)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.RuntimeException: Error obtaining the sorted input: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: java.ttil.HashSet
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
> at org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1079)
> at org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:450)
> ... 3 more
> Caused by: java.io.IOException: Thread 'SortMerger spilling thread' terminated due to an exception: Unable to find class: java.ttil.HashSet
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: java.ttil.HashSet
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:242)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:252)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:556)
> at org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:75)
> at org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
> at org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
> Caused by: java.lang.ClassNotFoundException: java.ttil.HashSet
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:136)
> {code}
> {code}
> Caused by: java.io.IOException: Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.
> at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:142)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
> at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
> at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> {code}
> {code}
> java.lang.RuntimeException: Cannot instantiate class.
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:407)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at org.apache.flink.runtime.operators.FlatMapDriver.run(FlatMapDriver.java:101)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: it.okkam.flink.test.model.pojo.VdhicleEvent
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:405)
> ... 10 more
> {code}
> {code}
> com.esotericsoftware.kryo.KryoException: Unable to find class: ^Z^A
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
> at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:228)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
> at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
> at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:124)
> at org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:65)
> at org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34)
> at org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
> at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:96)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:345)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> at java.lang.Thread.run(Thread.java:745)
> {code}
> {code}
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -2
> at java.util.ArrayList.elementData(ArrayList.java:418)
> at java.util.ArrayList.get(ArrayList.java:431)
> at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
> at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:232)
> at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:431)
> at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:135)
> at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:100)
> at org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:161)
> at org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:113)
> at org.apache.flink.runtime.operators.util.metrics.CountingMutableObjectIterator.next(CountingMutableObjectIterator.java:45)
> at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.advanceToNext(NonReusingKeyGroupedIterator.java:130)
> at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator.access$300(NonReusingKeyGroupedIterator.java:32)
> at org.apache.flink.runtime.util.NonReusingKeyGroupedIterator$ValuesIterator.next(NonReusingKeyGroupedIterator.java:192)
> at org.okkam.entitons.mapping.flink.IndexMappingExecutor$TupleToEntitonJsonNode.reduce(IndexMappingExecutor.java:64)
> at org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:131)
> at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
> at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
> at java.lang.Thread.run(Thread.java:745)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)