You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ido Bar Av <id...@microsoft.com> on 2017/08/14 14:35:17 UTC
Serialization problem: Using generic that extends a class on POJO.
Hi,
We're using flink 1.3.1, and we're trying to pass through the pipeline a POJO object that has a generic field )see details in the complete example below):
We have the class Foo<SomeKey extends BarKey>, and when sending a subclass with a specific SomeKey, we get the following exception:
java.lang.RuntimeException: Cannot instantiate class.
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)
at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Can not set ....BarKey field ...Foo.someKey to java.lang.Object
at java.lang.reflect.Field.set(Field.java:764)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)
... 10 more
If I understand correctly, for some reason, the deserializer used for SomeKey returns Object (before filling it), ignoring the fact that SomeKey extends a BarKey, and then fails when trying to assign it to the parent class.
What is the correct approach for this situation?
Thanks,
Ido
Complete code example:
public class BarKey implements Serializable {
public List<Long> valueList;
public BarKey() {
}
public BarKey(long value) {
super();
valueList = new ArrayList<>();
valueList.add(value);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
BarKey barKey = (BarKey) o;
return Objects.equals(valueList, barKey.valueList);
}
@Override
public int hashCode() {
return Objects.hash(valueList);
}
}
public class SomeKey extends BarKey implements Serializable {
public Integer banana=1;
public SomeKey() {
}
public SomeKey(long value) {
super(value);
}
}
public class Foo<SomeKey extends BarKey> implements Serializable {
public Foo() {}
public SomeKey someKey;
public Foo(SomeKey someKey) {
this.someKey = someKey;
}
}
public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey> implements Serializable {
public FooFoo() {
}
public Integer grill = 12;
public FooFoo(SomeKey someKey) {
super(someKey);
}
}
class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>> implements Serializable {
@Override
public void processElement(Integer value, Context ctx, Collector<Foo<BarKey>> out) throws Exception {
out.collect(new FooFoo<>(new SomeKey((long) value)));
}
}
class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>> implements Serializable {
@Override
public void processElement(Foo<BarKey> value, Context ctx, Collector<Foo<BarKey>> out) throws Exception {
value.someKey.valueList.add(1L);
out.collect(value);
}
}
class FooBarSelector<SomeKey extends BarKey> implements KeySelector<Foo<SomeKey>, BarKey>, Serializable {
@Override
public BarKey getKey(Foo<SomeKey> value) throws Exception {
return value.someKey;
}
}
class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {
private static final Logger logger = LoggerFactory.getLogger(FooBarSink.class);
public long dosomething = 0;
@Override
public void invoke(Foo<BarKey> value) throws Exception {
dosomething += value.someKey.valueList.size();
logger.warn("Sink {}", dosomething);
}
}
Test code:
environment.registerType(FooFoo.class); // Not certain if this is needed
List<Integer> intlist = new ArrayList<>();
intlist.add(3);
intlist.add(5);
DataStreamSource<Integer> streamSource = environment.fromCollection(intlist);
streamSource.process(new MakeFoo())
.keyBy(new FooBarSelector<>())
.process(new FooProcessor())
.addSink(new FooBarSink());
environment.execute("Jobname-UT");
Re: Serialization problem: Using generic that extends a class on
POJO.
Posted by Timo Walther <tw...@apache.org>.
Hi Ido,
thank you for your good example to reproduce the problem. I could find a
bug in Flink's type extraction logic and opened an issue for it [0]. The
problem seems to be the bounded generics in both the Foo and FooFoo.
Foo.someKey has the wrong type information. It is
GenericType<java.lang.Object>.
As a workaround until the issue is fixed you can do the following:
@TypeInfo(Foo.TypeFactory.class)
public static class Foo<SomeKey extends BarKey>implements Serializable {
public SomeKey someKey; public Foo() {}
public Foo(SomeKey someKey) {
this.someKey = someKey; }
public static class TypeFactoryextends TypeInfoFactory<Foo> {
@Override public TypeInformation<Foo>createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
return new GenericTypeInfo<>(Foo.class); }
}
}
I hope that helps.
Regards,
Timo
[0] https://issues.apache.org/jira/browse/FLINK-7450
Am 14.08.17 um 17:24 schrieb Timo Walther:
> Hi Ido,
>
> at the first glance, I could not find any problem in your code. So it
> might be a bug. The "environment.registerType()" is not needed in your
> case, because you have no generic types.
>
> I will have a closer look at it tomorrow.
>
> Regards,
> Timo
>
> Am 14.08.17 um 16:35 schrieb Ido Bar Av:
>>
>> Hi,
>>
>> We’re using flink 1.3.1, and we’re trying to pass through the
>> pipeline a POJO object that has a generic field )see details in the
>> complete example below):
>>
>> We have the class Foo<SomeKey extends BarKey>, and when sending a
>> subclass with a specific SomeKey, we get the following exception:
>>
>> java.lang.RuntimeException: Cannot instantiate class.
>>
>> at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)
>>
>> at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)
>>
>> at
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)
>>
>> at
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
>>
>> at
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>
>> at
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
>>
>> at
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>
>> at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.lang.IllegalArgumentException: Can not set ….BarKey
>> field …Foo.someKey to java.lang.Object
>>
>> at java.lang.reflect.Field.set(Field.java:764)
>>
>> at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
>>
>> at
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)
>>
>> ... 10 more
>>
>> If I understand correctly, for some reason, the deserializer used for
>> SomeKey returns Object (before filling it), ignoring the fact that
>> SomeKey extends a BarKey, and then fails when trying to assign it to
>> the parent class.
>>
>> What is the correct approach for this situation?
>>
>> Thanks,
>>
>> Ido
>>
>> Complete code example:
>>
>> *public class BarKey implements Serializable {*
>>
>> * public List<Long> valueList;*
>>
>> **
>>
>> * public BarKey() {*
>>
>> * }*
>>
>> **
>>
>> * public BarKey(long value) {*
>>
>> * super();*
>>
>> * valueList = new ArrayList<>();*
>>
>> * valueList.add(value);*
>>
>> * }*
>>
>> **
>>
>> * @Override*
>>
>> * public boolean equals(Object o) {*
>>
>> * if (this == o) {*
>>
>> * return true;*
>>
>> * }*
>>
>> * if (o == null || getClass() != o.getClass()) {*
>>
>> * return false;*
>>
>> * }*
>>
>> * BarKey barKey = (BarKey) o;*
>>
>> * return Objects.equals(valueList, barKey.valueList);*
>>
>> * }*
>>
>> **
>>
>> * @Override*
>>
>> * public int hashCode() {*
>>
>> * return Objects.hash(valueList);*
>>
>> * }*
>>
>> *}*
>>
>> **
>>
>> **
>>
>> *public class SomeKey extends BarKey implements Serializable {*
>>
>> *public Integer banana=1;*
>>
>> **
>>
>> *public SomeKey() {*
>>
>> *}*
>>
>> **
>>
>> *public SomeKey(long value) {*
>>
>> *super(value);*
>>
>> *}*
>>
>> *}*
>>
>> *public class Foo<SomeKey extends BarKey> implements Serializable {*
>>
>> **
>>
>> * public Foo() {}*
>>
>> * public SomeKey someKey;*
>>
>> **
>>
>> * public Foo(SomeKey someKey) {*
>>
>> * this.someKey = someKey;*
>>
>> * }*
>>
>> **
>>
>> *}*
>>
>> **
>>
>> *public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey>
>> implements Serializable {*
>>
>> * public FooFoo() {*
>>
>> * }*
>>
>> **
>>
>> * public Integer grill = 12;*
>>
>> * public FooFoo(SomeKey someKey) {*
>>
>> * super(someKey);*
>>
>> * }*
>>
>> **
>>
>> *}*
>>
>> class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>>
>> implements Serializable {
>>
>> @Override
>>
>> public void processElement(Integer value, Context ctx,
>> Collector<Foo<BarKey>> out) throws Exception {
>>
>> out.collect(new FooFoo<>(new SomeKey((long) value)));
>>
>> }
>>
>> }
>>
>> class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>>
>> implements Serializable {
>>
>> @Override
>>
>> public void processElement(Foo<BarKey> value, Context ctx,
>> Collector<Foo<BarKey>> out) throws Exception {
>>
>> value.someKey.valueList.add(1L);
>>
>> out.collect(value);
>>
>> }
>>
>> }
>>
>> class FooBarSelector<SomeKey extends BarKey> implements
>> KeySelector<Foo<SomeKey>, BarKey>, Serializable {
>>
>> @Override
>>
>> public BarKey getKey(Foo<SomeKey> value) throws Exception {
>>
>> return value.someKey;
>>
>> }
>>
>> }
>>
>> class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {
>>
>> private static final Logger logger =
>> LoggerFactory.getLogger(FooBarSink.class);
>>
>> public long dosomething = 0;
>>
>> @Override
>>
>> public void invoke(Foo<BarKey> value) throws Exception {
>>
>> dosomething += value.someKey.valueList.size();
>>
>> logger.warn("Sink {}", dosomething);
>>
>> }
>>
>> }
>>
>> Test code:
>>
>> environment.registerType(FooFoo.class); // Not certain if this is needed
>>
>> List<Integer> intlist = new ArrayList<>();
>>
>> intlist.add(3);
>>
>> intlist.add(5);
>>
>> DataStreamSource<Integer> streamSource =
>> environment.fromCollection(intlist);
>>
>> streamSource.process(new MakeFoo())
>>
>> .keyBy(new FooBarSelector<>())
>>
>> .process(new FooProcessor())
>>
>> .addSink(new FooBarSink());
>>
>> environment.execute(“Jobname-UT");
>>
>
Re: Serialization problem: Using generic that extends a class on
POJO.
Posted by Timo Walther <tw...@apache.org>.
Hi Ido,
at the first glance, I could not find any problem in your code. So it
might be a bug. The "environment.registerType()" is not needed in your
case, because you have no generic types.
I will have a closer look at it tomorrow.
Regards,
Timo
Am 14.08.17 um 16:35 schrieb Ido Bar Av:
>
> Hi,
>
> We’re using flink 1.3.1, and we’re trying to pass through the pipeline
> a POJO object that has a generic field )see details in the complete
> example below):
>
> We have the class Foo<SomeKey extends BarKey>, and when sending a
> subclass with a specific SomeKey, we get the following exception:
>
> java.lang.RuntimeException: Cannot instantiate class.
>
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)
>
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)
>
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)
>
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
>
> at
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>
> at
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
>
> at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
>
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>
> at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.IllegalArgumentException: Can not set ….BarKey
> field …Foo.someKey to java.lang.Object
>
> at java.lang.reflect.Field.set(Field.java:764)
>
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
>
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)
>
> ... 10 more
>
> If I understand correctly, for some reason, the deserializer used for
> SomeKey returns Object (before filling it), ignoring the fact that
> SomeKey extends a BarKey, and then fails when trying to assign it to
> the parent class.
>
> What is the correct approach for this situation?
>
> Thanks,
>
> Ido
>
> Complete code example:
>
> *public class BarKey implements Serializable {*
>
> * public List<Long> valueList;*
>
> **
>
> * public BarKey() {*
>
> * }*
>
> **
>
> * public BarKey(long value) {*
>
> * super();*
>
> * valueList = new ArrayList<>();*
>
> * valueList.add(value);*
>
> * }*
>
> **
>
> * @Override*
>
> * public boolean equals(Object o) {*
>
> * if (this == o) {*
>
> * return true;*
>
> * }*
>
> * if (o == null || getClass() != o.getClass()) {*
>
> * return false;*
>
> * }*
>
> * BarKey barKey = (BarKey) o;*
>
> * return Objects.equals(valueList, barKey.valueList);*
>
> * }*
>
> **
>
> * @Override*
>
> * public int hashCode() {*
>
> * return Objects.hash(valueList);*
>
> * }*
>
> *}*
>
> **
>
> **
>
> *public class SomeKey extends BarKey implements Serializable {*
>
> *public Integer banana=1;*
>
> **
>
> *public SomeKey() {*
>
> * }*
>
> **
>
> *public SomeKey(long value) {*
>
> *super(value);*
>
> * }*
>
> *}*
>
> *public class Foo<SomeKey extends BarKey> implements Serializable {*
>
> **
>
> * public Foo() {}*
>
> * public SomeKey someKey;*
>
> **
>
> * public Foo(SomeKey someKey) {*
>
> * this.someKey = someKey;*
>
> * }*
>
> **
>
> *}*
>
> **
>
> *public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey>
> implements Serializable {*
>
> * public FooFoo() {*
>
> * }*
>
> **
>
> * public Integer grill = 12;*
>
> * public FooFoo(SomeKey someKey) {*
>
> * super(someKey);*
>
> * }*
>
> **
>
> *}*
>
> class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>> implements
> Serializable {
>
> @Override
>
> public void processElement(Integer value, Context ctx,
> Collector<Foo<BarKey>> out) throws Exception {
>
> out.collect(new FooFoo<>(new SomeKey((long) value)));
>
> }
>
> }
>
> class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>>
> implements Serializable {
>
> @Override
>
> public void processElement(Foo<BarKey> value, Context ctx,
> Collector<Foo<BarKey>> out) throws Exception {
>
> value.someKey.valueList.add(1L);
>
> out.collect(value);
>
> }
>
> }
>
> class FooBarSelector<SomeKey extends BarKey> implements
> KeySelector<Foo<SomeKey>, BarKey>, Serializable {
>
> @Override
>
> public BarKey getKey(Foo<SomeKey> value) throws Exception {
>
> return value.someKey;
>
> }
>
> }
>
> class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {
>
> private static final Logger logger =
> LoggerFactory.getLogger(FooBarSink.class);
>
> public long dosomething = 0;
>
> @Override
>
> public void invoke(Foo<BarKey> value) throws Exception {
>
> dosomething += value.someKey.valueList.size();
>
> logger.warn("Sink {}", dosomething);
>
> }
>
> }
>
> Test code:
>
> environment.registerType(FooFoo.class); // Not certain if this is needed
>
> List<Integer> intlist = new ArrayList<>();
>
> intlist.add(3);
>
> intlist.add(5);
>
> DataStreamSource<Integer> streamSource =
> environment.fromCollection(intlist);
>
> streamSource.process(new MakeFoo())
>
> .keyBy(new FooBarSelector<>())
>
> .process(new FooProcessor())
>
> .addSink(new FooBarSink());
>
> environment.execute(“Jobname-UT");
>