You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jeff Zhang (Jira)" <ji...@apache.org> on 2020/01/13 08:05:00 UTC

[jira] [Commented] (FLINK-15566) Flink implicitly order the fields in PojoTypeInfo

    [ https://issues.apache.org/jira/browse/FLINK-15566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014100#comment-17014100 ] 

Jeff Zhang commented on FLINK-15566:
------------------------------------

[~aljoscha] Could you take a look at this issue ? Seems you are the original author of that piece of code.

> Flink implicitly order the fields in PojoTypeInfo
> -------------------------------------------------
>
>                 Key: FLINK-15566
>                 URL: https://issues.apache.org/jira/browse/FLINK-15566
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / Core
>    Affects Versions: 1.10.0
>            Reporter: Jeff Zhang
>            Priority: Major
>         Attachments: image-2020-01-13-16-02-57-949.png
>
>
> I don't know why flink would do that, but this cause my user defined function behavior incorrectly if I and pojo in my udf and override getResultType
> [https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/java/typeutils/PojoTypeInfo.java#L85]
>  
> Here's the udf I define.
>  
> {code:java}
> %flink
> import org.apache.flink.api.java.typeutils.RowTypeInfo
> import org.apache.flink.api.common.typeinfo.Types
> import org.apache.flink.api.java.typeutils._
> import org.apache.flink.api.scala.typeutils._
> import org.apache.flink.api.scala._
> class Person(val age:Int, val job: String, val marital: String, val education: String, val default: String, val balance: String, val housing: String, val loan: String, val contact: String, val day: String, val month: String, val duration: Int, val campaign: Int, val pdays: Int, val previous: Int, val poutcome: String, val y: String)
> class ParseFunction extends TableFunction[Person] {
>   def eval(line: String) {
>     val tokens = line.split(";")
>     // parse the line
>     if (!line.startsWith("\"age\"")) {
>       collect(Row.of(new Integer(tokens(0).toInt), normalize(tokens(1)), normalize(tokens(2)), normalize(tokens(3)), normalize(tokens(4)), normalize(tokens(5)), normalize(tokens(6)), normalize(tokens(7)), normalize(tokens(8)), normalize(tokens(9)), normalize(tokens(10)), new Integer(tokens(11).toInt),  new Integer(tokens(12).toInt),  
>            new Integer(tokens(13).toInt), new Integer(tokens(14).toInt),  normalize(tokens(15)), normalize(tokens(16))))
>     }
>   }
>   
>   override def getResultType() = {
>     val cls = classOf[Person]
>     new PojoTypeInfo[Person](classOf[Person], java.util.Arrays.asList(
>        new PojoField(cls.getDeclaredField("age"), Types.INT),
>        new PojoField(cls.getDeclaredField("job"), Types.STRING),
>        new PojoField(cls.getDeclaredField("marital"), Types.STRING),
>        new PojoField(cls.getDeclaredField("education"), Types.STRING),
>        new PojoField(cls.getDeclaredField("default"), Types.STRING),
>        new PojoField(cls.getDeclaredField("balance"), Types.STRING), 
>        new PojoField(cls.getDeclaredField("housing"), Types.STRING), 
>        new PojoField(cls.getDeclaredField("loan"), Types.STRING), 
>        new PojoField(cls.getDeclaredField("contact"), Types.STRING), 
>        new PojoField(cls.getDeclaredField("day"), Types.STRING), 
>        new PojoField(cls.getDeclaredField("month"), Types.STRING), 
>        new PojoField(cls.getDeclaredField("duration"), Types.INT),
>        new PojoField(cls.getDeclaredField("campaign"), Types.INT),
>        new PojoField(cls.getDeclaredField("pdays"), Types.INT),
>        new PojoField(cls.getDeclaredField("previous"), Types.INT),
>        new PojoField(cls.getDeclaredField("poutcome"), Types.STRING),
>        new PojoField(cls.getDeclaredField("y"), Types.STRING)
>      ))
>   }  
>   // remove the quote
>   private def normalize(token: String) = {
>       if (token.startsWith("\"")) {
>           token.substring(1, token.length - 1)
>       } else {
>           token
>       }
>   }
> }{code}
> And then I use this udf in sql but get the wrong result because the flink reorder the fields implicitly.
>  !image-2020-01-13-16-02-57-949.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)