You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fuyao Li <fu...@oracle.com> on 2022/05/13 04:08:01 UTC

Re: [External] : Re: How to get flink to use POJO serializer when enum is present in POJO class

Hi Tejas,

Yes, you can write a typefactory for enum. But I am assuming Flink should be able to recognize enum by default…

Anyways, you can do something like this:

Types.ENUM(RuleType.class);

This will return you a TypeInfomation which can be used to construct a typefactory..

BTW, could you take a look at my question in email: “How to define TypeInformation for Flink recursive resolved POJO” 😊 ?

Thanks,
Fuyao




From: Tejas B <te...@gmail.com>
Date: Thursday, May 12, 2022 at 16:32
To: Weihua Hu <hu...@gmail.com>
Cc: user <us...@flink.apache.org>
Subject: [External] : Re: How to get flink to use POJO serializer when enum is present in POJO class
Hi Weihua,
This is the error I am getting :
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) ... 11 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 11 at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)

From the error it looks like it's falling back to Kryo serializer instead of POJO serializer.

Thanks,
Tejas


On Thu, May 12, 2022 at 7:33 AM Weihua Hu <hu...@gmail.com>> wrote:
Hi, Tejas

These code is works in my idea environment.
Could you provide more error info or log?


Best,
Weihua


2022年5月10日 下午1:22,Tejas B <te...@gmail.com>> 写道:

Hi,
I am trying to get flink schema evolution to work for me using POJO serializer. But I found out that if an enum is present in the POJO then the POJO serializer is not used. Example of my POJO is as follows :

public class Rule {

String id;

int val;

RuleType ruleType;

//Newly added field

//int val2 = 0;



public Rule() {}



public Rule(String id, int val, RuleType ruleType) {

    this.id = id;

    this.val = val;

    this.ruleType = ruleType;

    //this.val2 = val2;

}



public String getId() {

    return id;

}



public void setId(String id) {

    this.id = id;

}



public int getVal() {

    return val;

}



public void setVal(int val) {

    this.val = val;

}



public RuleType getRuleType() {

    return ruleType;

}



public void setRuleType(RuleType ruleType) {

    this.ruleType = ruleType;

}



//public int getVal2() {

//    return val2;

//}



//public void setVal2(int val2) {

//    this.val2 = val2;

//}



@Override

public boolean equals(Object o) {

    if (this == o) return true;

    if (o == null || getClass() != o.getClass()) return false;

    Rule rule = (Rule) o;

    return val == rule.val && id.equals(rule.id<https://urldefense.com/v3/__http:/rule.id/__;!!ACWV5N9M2RV99hQ!In6iqbiXVGV5EOjV4INk4Kin1OlWIZ3n5wT67wwBK6rX_fZONISPLHWNnUGJKNHJYQhO8r_3JDK5HJdbT6Tl$>) && ruleType == rule.ruleType;

}



@Override

public int hashCode() {

    return Objects.hash(id, val, ruleType);

}



@Override

public String toString() {

    return "Rule{" +

            "name='" + id + '\'' +

            ", val=" + val +

            ", ruleType=" + ruleType +

            '}';

}

}

RuleType is an enum class as follows :

public enum RuleType {
    X,
    Y,
    Z

}

Now for the Rule class the schema evolution (Adding a new field called val2), works only if I write a custom typeFactory for this class.

Is there a way that I can write typeFactory for the enum class ? Why does the flink not recognize enum in a POJO class ?