You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Sofya T. Irwin" <so...@gmail.com> on 2022/01/06 15:06:11 UTC

Moving off of TypeInformation in Flink 1.11

Hi,

I’m moving my Flink 1.11 application onto the Blink Table Planner; and off
 of TypeInformation and onto DataTypes in preparation for upgrading Flink
 to Flink 1.13 or higher.

I’m having trouble moving off of TypeInformation.

Specifically I have a section of code that maps a DataStream[Message] to a
DataStream[Row]:

  implicit val typeInformation: TypeInformation[Row] =
 myObject.getProducedType
  val resultStream: DataStream[Row] = dataStream.map(msg =>
myTransform(msg))

Note that myTransform() takes in a Message object and returns a Row object.
Message is an internal class that we are using.
The resultStream:DataStream[Row] is passed as a StreamTableSource[Row]
later.

If I comment out the implicit val above, I get a failure:

  TableSource of type com.MyTableSourceFromDataStream returned a DataStream
of data type
  GenericType<org.apache.flink.types.Row> that does not match with the data
type
  ROW<`my_field_1` INT NOT NULL, ... `my_other_field` BIGINT> declared by
the
  TableSource.getProducedDataType() method. Please validate the
implementation of the TableSource.

I checked the Flink 1.11.4, Flink 1.13, and most recent sources and it
seems that the implementation of DataStream.map() is not changed and still
uses TypeInformation.

https://github.com/apache/flink/blob/master/flink
-streaming-scala/src/main/scala/org/apache/flink
/streaming/api/scala/DataStream.scala#L657

Based on the code above it seems that the issue is that Flink's
DataStream.map function uses TypeInformation.

I’m not sure if there’s an equivalent DataType implicit that I should be
declaring instead. Or if I should be using some function other than map

Do you have any suggestions for how to proceed? I'd like to completely move
off of TypeInformation in my app.

Thanks,
Sofya

Re: Moving off of TypeInformation in Flink 1.11

Posted by Francesco Guardiani <fr...@ververica.com>.
Hi Sofya,
DataStream API doesn't use DataTypes, but it still uses TypeInformation.
DataTypes and LogicalTypes are relevant only for Table API.

If I understood what you're trying to do, you don't need to manually
transform to Row, but you only need to define the Schema when crossing the
boundary from DataStream to Table API through
StreamTableEnvironment#fromDataStream
<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/data_stream_api/#examples-for-fromdatastream>.


Look at the javadoc of this method:
https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.html#fromDataStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.table.api.Schema-

Hope it helps,
FG

On Thu, Jan 6, 2022 at 4:06 PM Sofya T. Irwin <so...@gmail.com> wrote:

> Hi,
>
> I’m moving my Flink 1.11 application onto the Blink Table Planner; and off
>  of TypeInformation and onto DataTypes in preparation for upgrading Flink
>  to Flink 1.13 or higher.
>
> I’m having trouble moving off of TypeInformation.
>
> Specifically I have a section of code that maps a DataStream[Message] to a
> DataStream[Row]:
>
>   implicit val typeInformation: TypeInformation[Row] =
>  myObject.getProducedType
>   val resultStream: DataStream[Row] = dataStream.map(msg =>
> myTransform(msg))
>
> Note that myTransform() takes in a Message object and returns a Row object.
> Message is an internal class that we are using.
> The resultStream:DataStream[Row] is passed as a StreamTableSource[Row]
> later.
>
> If I comment out the implicit val above, I get a failure:
>
>   TableSource of type com.MyTableSourceFromDataStream returned a
> DataStream of data type
>   GenericType<org.apache.flink.types.Row> that does not match with the
> data type
>   ROW<`my_field_1` INT NOT NULL, ... `my_other_field` BIGINT> declared by
> the
>   TableSource.getProducedDataType() method. Please validate the
> implementation of the TableSource.
>
> I checked the Flink 1.11.4, Flink 1.13, and most recent sources and it
> seems that the implementation of DataStream.map() is not changed and still
> uses TypeInformation.
>
> https://github.com/apache/flink/blob/master/flink
> -streaming-scala/src/main/scala/org/apache/flink
> /streaming/api/scala/DataStream.scala#L657
>
> Based on the code above it seems that the issue is that Flink's
> DataStream.map function uses TypeInformation.
>
> I’m not sure if there’s an equivalent DataType implicit that I should be
> declaring instead. Or if I should be using some function other than map
>
> Do you have any suggestions for how to proceed? I'd like to completely
> move off of TypeInformation in my app.
>
> Thanks,
> Sofya
>