You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jeff Zhang (Jira)" <ji...@apache.org> on 2020/01/13 08:05:00 UTC
[jira] [Commented] (FLINK-15566) Flink implicitly order the fields
in PojoTypeInfo
[ https://issues.apache.org/jira/browse/FLINK-15566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014100#comment-17014100 ]
Jeff Zhang commented on FLINK-15566:
------------------------------------
[~aljoscha] Could you take a look at this issue ? Seems you are the original author of that piece of code.
> Flink implicitly order the fields in PojoTypeInfo
> -------------------------------------------------
>
> Key: FLINK-15566
> URL: https://issues.apache.org/jira/browse/FLINK-15566
> Project: Flink
> Issue Type: Improvement
> Components: API / Core
> Affects Versions: 1.10.0
> Reporter: Jeff Zhang
> Priority: Major
> Attachments: image-2020-01-13-16-02-57-949.png
>
>
> I don't know why flink would do that, but this cause my user defined function behavior incorrectly if I and pojo in my udf and override getResultType
> [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]
>
> Here's the udf I define.
>
> {code:java}
> %flink
> import org.apache.flink.api.java.typeutils.RowTypeInfo
> import org.apache.flink.api.common.typeinfo.Types
> import org.apache.flink.api.java.typeutils._
> import org.apache.flink.api.scala.typeutils._
> import org.apache.flink.api.scala._
> class Person(val age:Int, val job: String, val marital: String, val education: String, val default: String, val balance: String, val housing: String, val loan: String, val contact: String, val day: String, val month: String, val duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val poutcome: String, val y: String)
> class ParseFunction extends TableFunction[Person] {
> def eval(line: String) {
> val tokens = line.split(";")
> // parse the line
> if (!line.startsWith("\"age\"")) {
> collect(Row.of(new Integer(tokens(0).toInt), normalize(tokens(1)), normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new Integer(tokens(11).toInt), new Integer(tokens(12).toInt),
> new Integer(tokens(13).toInt), new Integer(tokens(14).toInt), normalize(tokens(15)), normalize(tokens(16))))
> }
> }
>
> override def getResultType() = {
> val cls = classOf[Person]
> new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList(
> new PojoField(cls.getDeclaredField("age"), Types.INT),
> new PojoField(cls.getDeclaredField("job"), Types.STRING),
> new PojoField(cls.getDeclaredField("marital"), Types.STRING),
> new PojoField(cls.getDeclaredField("education"), Types.STRING),
> new PojoField(cls.getDeclaredField("default"), Types.STRING),
> new PojoField(cls.getDeclaredField("balance"), Types.STRING),
> new PojoField(cls.getDeclaredField("housing"), Types.STRING),
> new PojoField(cls.getDeclaredField("loan"), Types.STRING),
> new PojoField(cls.getDeclaredField("contact"), Types.STRING),
> new PojoField(cls.getDeclaredField("day"), Types.STRING),
> new PojoField(cls.getDeclaredField("month"), Types.STRING),
> new PojoField(cls.getDeclaredField("duration"), Types.INT),
> new PojoField(cls.getDeclaredField("campaign"), Types.INT),
> new PojoField(cls.getDeclaredField("pdays"), Types.INT),
> new PojoField(cls.getDeclaredField("previous"), Types.INT),
> new PojoField(cls.getDeclaredField("poutcome"), Types.STRING),
> new PojoField(cls.getDeclaredField("y"), Types.STRING)
> ))
> }
> // remove the quote
> private def normalize(token: String) = {
> if (token.startsWith("\"")) {
> token.substring(1, token.length - 1)
> } else {
> token
> }
> }
> }{code}
> And then I use this udf in sql but get the wrong result because the flink reorder the fields implicitly.
> !image-2020-01-13-16-02-57-949.png!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)