You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ido Bar Av <id...@microsoft.com> on 2017/08/14 14:35:17 UTC

Serialization problem: Using generic that extends a class on POJO.

Hi,

We're using flink 1.3.1, and we're trying to pass through the pipeline a POJO object that has a generic field )see details in the complete example below):
We have the class Foo<SomeKey extends BarKey>, and when sending a subclass with a specific SomeKey, we get the following exception:


java.lang.RuntimeException: Cannot instantiate class.
              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)
              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)
              at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)
              at org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
              at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
              at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
              at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
              at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
              at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
              at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
              at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Can not set ....BarKey field ...Foo.someKey to java.lang.Object
              at java.lang.reflect.Field.set(Field.java:764)
              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
              at org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)
              ... 10 more


If I understand correctly, for some reason, the deserializer used for SomeKey returns Object (before filling it), ignoring the fact that SomeKey extends a BarKey, and then fails when trying to assign it to the parent class.
What is the correct approach for this situation?

Thanks,
Ido



Complete code example:
public class BarKey implements Serializable {
    public List<Long> valueList;

    public BarKey() {
    }

    public BarKey(long value) {
        super();
        valueList = new ArrayList<>();
        valueList.add(value);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        BarKey barKey = (BarKey) o;
        return Objects.equals(valueList, barKey.valueList);
    }

    @Override
    public int hashCode() {
        return Objects.hash(valueList);
    }
}


public class SomeKey extends BarKey implements Serializable {
    public Integer banana=1;

    public SomeKey() {
    }

    public SomeKey(long value) {
        super(value);
    }
}


public class Foo<SomeKey extends BarKey> implements Serializable {

    public Foo() {}
    public SomeKey someKey;

    public Foo(SomeKey someKey) {
        this.someKey = someKey;
    }

}

public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey> implements Serializable {
    public FooFoo() {
    }

    public Integer grill = 12;
    public FooFoo(SomeKey someKey) {
        super(someKey);
    }

}
class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>> implements Serializable {
    @Override
    public void processElement(Integer value, Context ctx, Collector<Foo<BarKey>> out) throws Exception {
        out.collect(new FooFoo<>(new SomeKey((long) value)));
    }
}


class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>> implements Serializable {
    @Override
    public void processElement(Foo<BarKey> value, Context ctx, Collector<Foo<BarKey>> out) throws Exception {
        value.someKey.valueList.add(1L);
        out.collect(value);
    }
}

class FooBarSelector<SomeKey extends BarKey> implements KeySelector<Foo<SomeKey>, BarKey>, Serializable {
    @Override
    public BarKey getKey(Foo<SomeKey> value) throws Exception {
        return value.someKey;
    }
}

class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {
    private static final Logger logger = LoggerFactory.getLogger(FooBarSink.class);
    public long dosomething = 0;

    @Override
    public void invoke(Foo<BarKey> value) throws Exception {
        dosomething += value.someKey.valueList.size();
        logger.warn("Sink {}", dosomething);
    }
}


Test code:

        environment.registerType(FooFoo.class); // Not certain if this is needed

        List<Integer> intlist = new ArrayList<>();
        intlist.add(3);
        intlist.add(5);

        DataStreamSource<Integer> streamSource = environment.fromCollection(intlist);

        streamSource.process(new MakeFoo())
             .keyBy(new FooBarSelector<>())
             .process(new FooProcessor())
             .addSink(new FooBarSink());

        environment.execute("Jobname-UT");

Re: Serialization problem: Using generic that extends a class on POJO.

Posted by Timo Walther <tw...@apache.org>.
Hi Ido,

thank you for your good example to reproduce the problem. I could find a 
bug in Flink's type extraction logic and opened an issue for it [0]. The 
problem seems to be the bounded generics in both the Foo and FooFoo. 
Foo.someKey has the wrong type information. It is 
GenericType<java.lang.Object>.

As a workaround until the issue is fixed you can do the following:

@TypeInfo(Foo.TypeFactory.class)
public static class Foo<SomeKey extends BarKey>implements Serializable {

     public SomeKey someKey; public Foo() {}

     public Foo(SomeKey someKey) {

         this.someKey = someKey; }

     public static class TypeFactoryextends TypeInfoFactory<Foo> {

       @Override public TypeInformation<Foo>createTypeInfo(Type t, Map<String, TypeInformation<?>> genericParameters) {
          return new GenericTypeInfo<>(Foo.class); }
    }
}


I hope that helps.

Regards,
Timo


[0] https://issues.apache.org/jira/browse/FLINK-7450

Am 14.08.17 um 17:24 schrieb Timo Walther:
> Hi Ido,
>
> at the first glance, I could not find any problem in your code. So it 
> might be a bug. The "environment.registerType()" is not needed in your 
> case, because you have no generic types.
>
> I will have a closer look at it tomorrow.
>
> Regards,
> Timo
>
> Am 14.08.17 um 16:35 schrieb Ido Bar Av:
>>
>> Hi,
>>
>> We’re using flink 1.3.1, and we’re trying to pass through the 
>> pipeline a POJO object that has a generic field )see details in the 
>> complete example below):
>>
>> We have the class Foo<SomeKey extends BarKey>, and when sending a 
>> subclass with a specific SomeKey, we get the following exception:
>>
>> java.lang.RuntimeException: Cannot instantiate class.
>>
>>               at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)
>>
>>               at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)
>>
>>               at 
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)
>>
>>               at 
>> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
>>
>>               at 
>> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>>
>>               at 
>> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
>>
>>               at 
>> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
>>
>>               at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>>
>>               at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>>
>>               at 
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>
>>               at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.lang.IllegalArgumentException: Can not set ….BarKey 
>> field …Foo.someKey to java.lang.Object
>>
>>               at java.lang.reflect.Field.set(Field.java:764)
>>
>>               at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
>>
>>               at 
>> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)
>>
>>               ... 10 more
>>
>> If I understand correctly, for some reason, the deserializer used for 
>> SomeKey returns Object (before filling it), ignoring the fact that 
>> SomeKey extends a BarKey, and then fails when trying to assign it to 
>> the parent class.
>>
>> What is the correct approach for this situation?
>>
>> Thanks,
>>
>> Ido
>>
>> Complete code example:
>>
>> *public class BarKey implements Serializable {*
>>
>> *    public List<Long> valueList;*
>>
>> **
>>
>> *    public BarKey() {*
>>
>> *    }*
>>
>> **
>>
>> *    public BarKey(long value) {*
>>
>> *        super();*
>>
>> *        valueList = new ArrayList<>();*
>>
>> *        valueList.add(value);*
>>
>> *    }*
>>
>> **
>>
>> *    @Override*
>>
>> *    public boolean equals(Object o) {*
>>
>> *        if (this == o) {*
>>
>> *            return true;*
>>
>> *        }*
>>
>> *        if (o == null || getClass() != o.getClass()) {*
>>
>> *            return false;*
>>
>> *        }*
>>
>> *        BarKey barKey = (BarKey) o;*
>>
>> *        return Objects.equals(valueList, barKey.valueList);*
>>
>> *    }*
>>
>> **
>>
>> *    @Override*
>>
>> *    public int hashCode() {*
>>
>> *        return Objects.hash(valueList);*
>>
>> *    }*
>>
>> *}*
>>
>> **
>>
>> **
>>
>> *public class SomeKey extends BarKey implements Serializable {*
>>
>> *public Integer banana=1;*
>>
>> **
>>
>> *public SomeKey() {*
>>
>> *}*
>>
>> **
>>
>> *public SomeKey(long value) {*
>>
>> *super(value);*
>>
>> *}*
>>
>> *}*
>>
>> *public class Foo<SomeKey extends BarKey> implements Serializable {*
>>
>> **
>>
>> *    public Foo() {}*
>>
>> *    public SomeKey someKey;*
>>
>> **
>>
>> *    public Foo(SomeKey someKey) {*
>>
>> *        this.someKey = someKey;*
>>
>> *    }*
>>
>> **
>>
>> *}*
>>
>> **
>>
>> *public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey> 
>> implements Serializable {*
>>
>> *    public FooFoo() {*
>>
>> *    }*
>>
>> **
>>
>> *    public Integer grill = 12;*
>>
>> *    public FooFoo(SomeKey someKey) {*
>>
>> *        super(someKey);*
>>
>> *    }*
>>
>> **
>>
>> *}*
>>
>> class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>> 
>> implements Serializable {
>>
>>     @Override
>>
>>     public void processElement(Integer value, Context ctx, 
>> Collector<Foo<BarKey>> out) throws Exception {
>>
>>         out.collect(new FooFoo<>(new SomeKey((long) value)));
>>
>>     }
>>
>> }
>>
>> class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>> 
>> implements Serializable {
>>
>>     @Override
>>
>>     public void processElement(Foo<BarKey> value, Context ctx, 
>> Collector<Foo<BarKey>> out) throws Exception {
>>
>>         value.someKey.valueList.add(1L);
>>
>>         out.collect(value);
>>
>>     }
>>
>> }
>>
>> class FooBarSelector<SomeKey extends BarKey> implements 
>> KeySelector<Foo<SomeKey>, BarKey>, Serializable {
>>
>>     @Override
>>
>>     public BarKey getKey(Foo<SomeKey> value) throws Exception {
>>
>>         return value.someKey;
>>
>>     }
>>
>> }
>>
>> class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {
>>
>>     private static final Logger logger = 
>> LoggerFactory.getLogger(FooBarSink.class);
>>
>>     public long dosomething = 0;
>>
>>     @Override
>>
>>     public void invoke(Foo<BarKey> value) throws Exception {
>>
>>         dosomething += value.someKey.valueList.size();
>>
>>         logger.warn("Sink {}", dosomething);
>>
>>     }
>>
>> }
>>
>> Test code:
>>
>> environment.registerType(FooFoo.class); // Not certain if this is needed
>>
>>         List<Integer> intlist = new ArrayList<>();
>>
>>         intlist.add(3);
>>
>>         intlist.add(5);
>>
>>         DataStreamSource<Integer> streamSource = 
>> environment.fromCollection(intlist);
>>
>>         streamSource.process(new MakeFoo())
>>
>>              .keyBy(new FooBarSelector<>())
>>
>>              .process(new FooProcessor())
>>
>>              .addSink(new FooBarSink());
>>
>> environment.execute(“Jobname-UT");
>>
>


Re: Serialization problem: Using generic that extends a class on POJO.

Posted by Timo Walther <tw...@apache.org>.
Hi Ido,

at the first glance, I could not find any problem in your code. So it 
might be a bug. The "environment.registerType()" is not needed in your 
case, because you have no generic types.

I will have a closer look at it tomorrow.

Regards,
Timo

Am 14.08.17 um 16:35 schrieb Ido Bar Av:
>
> Hi,
>
> We’re using flink 1.3.1, and we’re trying to pass through the pipeline 
> a POJO object that has a generic field )see details in the complete 
> example below):
>
> We have the class Foo<SomeKey extends BarKey>, and when sending a 
> subclass with a specific SomeKey, we get the following exception:
>
> java.lang.RuntimeException: Cannot instantiate class.
>
>               at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:201)
>
>               at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.deserialize(PojoSerializer.java:395)
>
>               at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:206)
>
>               at 
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.deserialize(StreamElementSerializer.java:48)
>
>               at 
> org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55)
>
>               at 
> org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:109)
>
>               at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:176)
>
>               at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>
>               at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>
>               at 
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>
>               at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.IllegalArgumentException: Can not set ….BarKey 
> field …Foo.someKey to java.lang.Object
>
>               at java.lang.reflect.Field.set(Field.java:764)
>
>               at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.initializeFields(PojoSerializer.java:209)
>
>               at 
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.createInstance(PojoSerializer.java:197)
>
>               ... 10 more
>
> If I understand correctly, for some reason, the deserializer used for 
> SomeKey returns Object (before filling it), ignoring the fact that 
> SomeKey extends a BarKey, and then fails when trying to assign it to 
> the parent class.
>
> What is the correct approach for this situation?
>
> Thanks,
>
> Ido
>
> Complete code example:
>
> *public class BarKey implements Serializable {*
>
> *    public List<Long> valueList;*
>
> **
>
> *    public BarKey() {*
>
> *    }*
>
> **
>
> *    public BarKey(long value) {*
>
> *        super();*
>
> *        valueList = new ArrayList<>();*
>
> *        valueList.add(value);*
>
> *    }*
>
> **
>
> *    @Override*
>
> *    public boolean equals(Object o) {*
>
> *        if (this == o) {*
>
> *            return true;*
>
> *        }*
>
> *        if (o == null || getClass() != o.getClass()) {*
>
> *            return false;*
>
> *        }*
>
> *        BarKey barKey = (BarKey) o;*
>
> *        return Objects.equals(valueList, barKey.valueList);*
>
> *    }*
>
> **
>
> *    @Override*
>
> *    public int hashCode() {*
>
> *        return Objects.hash(valueList);*
>
> *    }*
>
> *}*
>
> **
>
> **
>
> *public class SomeKey extends BarKey implements Serializable {*
>
> *public Integer banana=1;*
>
> **
>
> *public SomeKey() {*
>
> *    }*
>
> **
>
> *public SomeKey(long value) {*
>
> *super(value);*
>
> *    }*
>
> *}*
>
> *public class Foo<SomeKey extends BarKey> implements Serializable {*
>
> **
>
> *    public Foo() {}*
>
> *    public SomeKey someKey;*
>
> **
>
> *    public Foo(SomeKey someKey) {*
>
> *        this.someKey = someKey;*
>
> *    }*
>
> **
>
> *}*
>
> **
>
> *public class FooFoo<SomeKey extends BarKey> extends Foo<SomeKey> 
> implements Serializable {*
>
> *    public FooFoo() {*
>
> *    }*
>
> **
>
> *    public Integer grill = 12;*
>
> *    public FooFoo(SomeKey someKey) {*
>
> *        super(someKey);*
>
> *    }*
>
> **
>
> *}*
>
> class MakeFoo extends ProcessFunction<Integer, Foo<BarKey>> implements 
> Serializable {
>
>     @Override
>
>     public void processElement(Integer value, Context ctx, 
> Collector<Foo<BarKey>> out) throws Exception {
>
>         out.collect(new FooFoo<>(new SomeKey((long) value)));
>
>     }
>
> }
>
> class FooProcessor extends ProcessFunction<Foo<BarKey>, Foo<BarKey>> 
> implements Serializable {
>
>     @Override
>
>     public void processElement(Foo<BarKey> value, Context ctx, 
> Collector<Foo<BarKey>> out) throws Exception {
>
>         value.someKey.valueList.add(1L);
>
>         out.collect(value);
>
>     }
>
> }
>
> class FooBarSelector<SomeKey extends BarKey> implements 
> KeySelector<Foo<SomeKey>, BarKey>, Serializable {
>
>     @Override
>
>     public BarKey getKey(Foo<SomeKey> value) throws Exception {
>
>         return value.someKey;
>
>     }
>
> }
>
> class FooBarSink implements Serializable, SinkFunction<Foo<BarKey>> {
>
>     private static final Logger logger = 
> LoggerFactory.getLogger(FooBarSink.class);
>
>     public long dosomething = 0;
>
>     @Override
>
>     public void invoke(Foo<BarKey> value) throws Exception {
>
>         dosomething += value.someKey.valueList.size();
>
>         logger.warn("Sink {}", dosomething);
>
>     }
>
> }
>
> Test code:
>
> environment.registerType(FooFoo.class); // Not certain if this is needed
>
>         List<Integer> intlist = new ArrayList<>();
>
>         intlist.add(3);
>
>         intlist.add(5);
>
>         DataStreamSource<Integer> streamSource = 
> environment.fromCollection(intlist);
>
>         streamSource.process(new MakeFoo())
>
>              .keyBy(new FooBarSelector<>())
>
>              .process(new FooProcessor())
>
>              .addSink(new FooBarSink());
>
>         environment.execute(“Jobname-UT");
>