You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2018/03/19 17:30:00 UTC

[jira] [Closed] (FLINK-9025) Problem with convert String to Row

     [ https://issues.apache.org/jira/browse/FLINK-9025?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Fabian Hueske closed FLINK-9025.
--------------------------------
    Resolution: Not A Problem

The {{Row}} data type was not designed for flexible schemas.

Closing as "Not a problem".

> Problem with convert String to Row
> ----------------------------------
>
>                 Key: FLINK-9025
>                 URL: https://issues.apache.org/jira/browse/FLINK-9025
>             Project: Flink
>          Issue Type: Bug
>          Components: Java API
>            Reporter: ChenDonghe
>            Priority: Major
>
> when we convert json string into Row, and set type of filed in the row, code below:
> {code:java}
> // code placeholder
> DataStream<String> stream = env.addSource(consumer.getInstance(sourceTopic, new SimpleStringSchema()).setStartFromLatest());
> DataStream<Row> dataStreamRow = stream.map(new ConvertDataStream()).returns(typeinfo);
> {code}
> this convert process calls the copy function in RowSerializer class, code below:
>  
> {code:java}
> // public final class RowSerializer extends TypeSerializer<Row>
> @Override
> public Row copy(Row from) {
>    int len = fieldSerializers.length;
>    if (from.getArity() != len) {
>       throw new RuntimeException("Row arity of from does not match serializers.");
>    }
>    Row result = new Row(len);
>    for (int i = 0; i < len; i++) {
>       Object fromField = from.getField(i);
>       if (fromField != null) {
>          Object copy = fieldSerializers[i].copy(fromField);
>          result.setField(i, copy);
>       } else {
>          result.setField(i, null);
>       }
>    }
>    return result;
> }
> {code}
> the json string type message from kafka convert to the row type, in this process, RowSerrializer copy the from-row to a new result-row, but the type of result-row filed is Object type, for example, the first message from kafka filed0 is Integer type, the second message from kafka filed0 is Long type, if we set the filed0 is Long, we hope Integer can be compatible, but we got an exception : can not convert Interger to Long, so we hope RowSerrializer copy function can be more flexible, it can be act as a table, we can insert an Integer type value into Long type filed.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)