You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fuyao Li <fu...@oracle.com> on 2022/05/13 04:08:01 UTC
Re: [External] : Re: How to get flink to use POJO serializer when enum is present in POJO class
Hi Tejas,
Yes, you can write a typefactory for enum. But I am assuming Flink should be able to recognize enum by default…
Anyways, you can do something like this:
Types.ENUM(RuleType.class);
This will return you a TypeInfomation which can be used to construct a typefactory..
BTW, could you take a look at my question in email: “How to define TypeInformation for Flink recursive resolved POJO” 😊 ?
Thanks,
Fuyao
From: Tejas B <te...@gmail.com>
Date: Thursday, May 12, 2022 at 16:32
To: Weihua Hu <hu...@gmail.com>
Cc: user <us...@flink.apache.org>
Subject: [External] : Re: How to get flink to use POJO serializer when enum is present in POJO class
Hi Weihua,
This is the error I am getting :
Caused by: org.apache.flink.util.FlinkException: Could not restore operator state backend for CoBroadcastWithNonKeyedOperator_8c5504f305beefca0724b3e55af8ea26_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:286) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:174) ... 11 more Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed when trying to restore operator state backend at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83) at org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createOperatorStateBackend(HashMapStateBackend.java:148) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:277) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) ... 13 more Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: 11 at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138) at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
From the error it looks like it's falling back to Kryo serializer instead of POJO serializer.
Thanks,
Tejas
On Thu, May 12, 2022 at 7:33 AM Weihua Hu <hu...@gmail.com>> wrote:
Hi, Tejas
These code is works in my idea environment.
Could you provide more error info or log?
Best,
Weihua
2022年5月10日 下午1:22,Tejas B <te...@gmail.com>> 写道:
Hi,
I am trying to get flink schema evolution to work for me using POJO serializer. But I found out that if an enum is present in the POJO then the POJO serializer is not used. Example of my POJO is as follows :
public class Rule {
String id;
int val;
RuleType ruleType;
//Newly added field
//int val2 = 0;
public Rule() {}
public Rule(String id, int val, RuleType ruleType) {
this.id = id;
this.val = val;
this.ruleType = ruleType;
//this.val2 = val2;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public int getVal() {
return val;
}
public void setVal(int val) {
this.val = val;
}
public RuleType getRuleType() {
return ruleType;
}
public void setRuleType(RuleType ruleType) {
this.ruleType = ruleType;
}
//public int getVal2() {
// return val2;
//}
//public void setVal2(int val2) {
// this.val2 = val2;
//}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Rule rule = (Rule) o;
return val == rule.val && id.equals(rule.id<https://urldefense.com/v3/__http:/rule.id/__;!!ACWV5N9M2RV99hQ!In6iqbiXVGV5EOjV4INk4Kin1OlWIZ3n5wT67wwBK6rX_fZONISPLHWNnUGJKNHJYQhO8r_3JDK5HJdbT6Tl$>) && ruleType == rule.ruleType;
}
@Override
public int hashCode() {
return Objects.hash(id, val, ruleType);
}
@Override
public String toString() {
return "Rule{" +
"name='" + id + '\'' +
", val=" + val +
", ruleType=" + ruleType +
'}';
}
}
RuleType is an enum class as follows :
public enum RuleType {
X,
Y,
Z
}
Now for the Rule class the schema evolution (Adding a new field called val2), works only if I write a custom typeFactory for this class.
Is there a way that I can write typeFactory for the enum class ? Why does the flink not recognize enum in a POJO class ?