You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Matthias Broecheler <ma...@dataeng.ai> on 2021/08/19 23:01:27 UTC

DataStream to Table API

Hey Flinkers,

I am trying to follow the docs
<https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api>
to
convert a DataStream to a Table. Specifically, I have a DataStream of Row
and want the columns of the row to become the columns of the resulting
table.

That works but only if I construct the Rows statically. If I construct them
dynamically (in a map) then Flink turns the entire Row into one column of
type "RAW('org.apache.flink.types.Row', '...')".

Does anybody know why this is the case or how to fix it? Take a look at the
simple Flink program below where I construct the DataStream "rows" in two
different ways. I would expect those to be identical (and the sink does
print identical information) but the inferred table schema is different.

Thanks a ton,
Matthias

------------------------------

        StreamExecutionEnvironment flinkEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
        flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);

        DataStream<Integer> integers = flinkEnv.fromElements(12, 5);

        DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));

//  This alternative way of constructing this data stream produces the
expected table schema
//      DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12",
12), Row.of("Name5", 5));

        StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(flinkEnv);
        Table table = tableEnv.fromDataStream(rows);
        table.printSchema();

        rows.addSink(new PrintSinkFunction<>());

        flinkEnv.execute();

Re: DataStream to Table API

Posted by Matthias Broecheler <ma...@dataeng.ai>.
Perfect, that worked.

Thanks a lot, JING!

On Sun, Aug 22, 2021 at 1:25 AM JING ZHANG <be...@gmail.com> wrote:

> Hi Matthias,
> Before the bug is fixed, you could specify the return type explicitly in
> the second parameter of the map function.
>
> DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));   ->
>
> DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i), new RowTypeInfo(Types.STRING, Types.INT));
>
> Best,
> JING ZHANG
>
>
>
> Matthias Broecheler <ma...@dataeng.ai> 于2021年8月21日周六 上午12:40写道:
>
>> Thank you, Caizhi, for looking into this and identifying the source of
>> the bug. Is there a way to work around this at the API level until this bug
>> is resolved? Can I somehow "inject" the type?
>>
>> Thanks a lot for your help,
>> Matthias
>>
>> On Thu, Aug 19, 2021 at 10:15 PM Caizhi Weng <ts...@gmail.com>
>> wrote:
>>
>>> Hi!
>>>
>>> I've created a JIRA ticket[1] for this issue. Please check it out and
>>> track the progress there.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23885
>>>
>>> Caizhi Weng <ts...@gmail.com> 于2021年8月20日周五 上午10:47写道:
>>>
>>>> Hi!
>>>>
>>>> This is because TypeExtractor#getMapReturnTypes are not dealing with
>>>> row types (see that method and also TypeExtractor#privateGetForClass). You
>>>> might want to open a JIRA ticket for this.
>>>>
>>>> Matthias Broecheler <ma...@dataeng.ai> 于2021年8月20日周五 上午7:01写道:
>>>>
>>>>> Hey Flinkers,
>>>>>
>>>>> I am trying to follow the docs
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api> to
>>>>> convert a DataStream to a Table. Specifically, I have a DataStream of Row
>>>>> and want the columns of the row to become the columns of the resulting
>>>>> table.
>>>>>
>>>>> That works but only if I construct the Rows statically. If I construct
>>>>> them dynamically (in a map) then Flink turns the entire Row into one column
>>>>> of type "RAW('org.apache.flink.types.Row', '...')".
>>>>>
>>>>> Does anybody know why this is the case or how to fix it? Take a look
>>>>> at the simple Flink program below where I construct the DataStream "rows"
>>>>> in two different ways. I would expect those to be identical (and the sink
>>>>> does print identical information) but the inferred table schema is
>>>>> different.
>>>>>
>>>>> Thanks a ton,
>>>>> Matthias
>>>>>
>>>>> ------------------------------
>>>>>
>>>>>         StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>         flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>>>
>>>>>         DataStream<Integer> integers = flinkEnv.fromElements(12, 5);
>>>>>
>>>>>         DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));
>>>>>
>>>>> //  This alternative way of constructing this data stream produces the expected table schema
>>>>> //      DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", 12), Row.of("Name5", 5));
>>>>>
>>>>>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(flinkEnv);
>>>>>         Table table = tableEnv.fromDataStream(rows);
>>>>>         table.printSchema();
>>>>>
>>>>>         rows.addSink(new PrintSinkFunction<>());
>>>>>
>>>>>         flinkEnv.execute();
>>>>>
>>>>>

Re: DataStream to Table API

Posted by JING ZHANG <be...@gmail.com>.
Hi Matthias,
Before the bug is fixed, you could specify the return type explicitly in
the second parameter of the map function.

DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));   ->

DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i), new
RowTypeInfo(Types.STRING, Types.INT));

Best,
JING ZHANG



Matthias Broecheler <ma...@dataeng.ai> 于2021年8月21日周六 上午12:40写道:

> Thank you, Caizhi, for looking into this and identifying the source of the
> bug. Is there a way to work around this at the API level until this bug is
> resolved? Can I somehow "inject" the type?
>
> Thanks a lot for your help,
> Matthias
>
> On Thu, Aug 19, 2021 at 10:15 PM Caizhi Weng <ts...@gmail.com> wrote:
>
>> Hi!
>>
>> I've created a JIRA ticket[1] for this issue. Please check it out and
>> track the progress there.
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-23885
>>
>> Caizhi Weng <ts...@gmail.com> 于2021年8月20日周五 上午10:47写道:
>>
>>> Hi!
>>>
>>> This is because TypeExtractor#getMapReturnTypes are not dealing with row
>>> types (see that method and also TypeExtractor#privateGetForClass). You
>>> might want to open a JIRA ticket for this.
>>>
>>> Matthias Broecheler <ma...@dataeng.ai> 于2021年8月20日周五 上午7:01写道:
>>>
>>>> Hey Flinkers,
>>>>
>>>> I am trying to follow the docs
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api> to
>>>> convert a DataStream to a Table. Specifically, I have a DataStream of Row
>>>> and want the columns of the row to become the columns of the resulting
>>>> table.
>>>>
>>>> That works but only if I construct the Rows statically. If I construct
>>>> them dynamically (in a map) then Flink turns the entire Row into one column
>>>> of type "RAW('org.apache.flink.types.Row', '...')".
>>>>
>>>> Does anybody know why this is the case or how to fix it? Take a look at
>>>> the simple Flink program below where I construct the DataStream "rows" in
>>>> two different ways. I would expect those to be identical (and the sink does
>>>> print identical information) but the inferred table schema is different.
>>>>
>>>> Thanks a ton,
>>>> Matthias
>>>>
>>>> ------------------------------
>>>>
>>>>         StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>         flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>>
>>>>         DataStream<Integer> integers = flinkEnv.fromElements(12, 5);
>>>>
>>>>         DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));
>>>>
>>>> //  This alternative way of constructing this data stream produces the expected table schema
>>>> //      DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", 12), Row.of("Name5", 5));
>>>>
>>>>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(flinkEnv);
>>>>         Table table = tableEnv.fromDataStream(rows);
>>>>         table.printSchema();
>>>>
>>>>         rows.addSink(new PrintSinkFunction<>());
>>>>
>>>>         flinkEnv.execute();
>>>>
>>>>

Re: DataStream to Table API

Posted by Matthias Broecheler <ma...@dataeng.ai>.
Thank you, Caizhi, for looking into this and identifying the source of the
bug. Is there a way to work around this at the API level until this bug is
resolved? Can I somehow "inject" the type?

Thanks a lot for your help,
Matthias

On Thu, Aug 19, 2021 at 10:15 PM Caizhi Weng <ts...@gmail.com> wrote:

> Hi!
>
> I've created a JIRA ticket[1] for this issue. Please check it out and
> track the progress there.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23885
>
> Caizhi Weng <ts...@gmail.com> 于2021年8月20日周五 上午10:47写道:
>
>> Hi!
>>
>> This is because TypeExtractor#getMapReturnTypes are not dealing with row
>> types (see that method and also TypeExtractor#privateGetForClass). You
>> might want to open a JIRA ticket for this.
>>
>> Matthias Broecheler <ma...@dataeng.ai> 于2021年8月20日周五 上午7:01写道:
>>
>>> Hey Flinkers,
>>>
>>> I am trying to follow the docs
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api> to
>>> convert a DataStream to a Table. Specifically, I have a DataStream of Row
>>> and want the columns of the row to become the columns of the resulting
>>> table.
>>>
>>> That works but only if I construct the Rows statically. If I construct
>>> them dynamically (in a map) then Flink turns the entire Row into one column
>>> of type "RAW('org.apache.flink.types.Row', '...')".
>>>
>>> Does anybody know why this is the case or how to fix it? Take a look at
>>> the simple Flink program below where I construct the DataStream "rows" in
>>> two different ways. I would expect those to be identical (and the sink does
>>> print identical information) but the inferred table schema is different.
>>>
>>> Thanks a ton,
>>> Matthias
>>>
>>> ------------------------------
>>>
>>>         StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>>>         flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>
>>>         DataStream<Integer> integers = flinkEnv.fromElements(12, 5);
>>>
>>>         DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));
>>>
>>> //  This alternative way of constructing this data stream produces the expected table schema
>>> //      DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", 12), Row.of("Name5", 5));
>>>
>>>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(flinkEnv);
>>>         Table table = tableEnv.fromDataStream(rows);
>>>         table.printSchema();
>>>
>>>         rows.addSink(new PrintSinkFunction<>());
>>>
>>>         flinkEnv.execute();
>>>
>>>

Re: DataStream to Table API

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

I've created a JIRA ticket[1] for this issue. Please check it out and track
the progress there.

[1] https://issues.apache.org/jira/browse/FLINK-23885

Caizhi Weng <ts...@gmail.com> 于2021年8月20日周五 上午10:47写道:

> Hi!
>
> This is because TypeExtractor#getMapReturnTypes are not dealing with row
> types (see that method and also TypeExtractor#privateGetForClass). You
> might want to open a JIRA ticket for this.
>
> Matthias Broecheler <ma...@dataeng.ai> 于2021年8月20日周五 上午7:01写道:
>
>> Hey Flinkers,
>>
>> I am trying to follow the docs
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api> to
>> convert a DataStream to a Table. Specifically, I have a DataStream of Row
>> and want the columns of the row to become the columns of the resulting
>> table.
>>
>> That works but only if I construct the Rows statically. If I construct
>> them dynamically (in a map) then Flink turns the entire Row into one column
>> of type "RAW('org.apache.flink.types.Row', '...')".
>>
>> Does anybody know why this is the case or how to fix it? Take a look at
>> the simple Flink program below where I construct the DataStream "rows" in
>> two different ways. I would expect those to be identical (and the sink does
>> print identical information) but the inferred table schema is different.
>>
>> Thanks a ton,
>> Matthias
>>
>> ------------------------------
>>
>>         StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>>         flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>>         DataStream<Integer> integers = flinkEnv.fromElements(12, 5);
>>
>>         DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));
>>
>> //  This alternative way of constructing this data stream produces the expected table schema
>> //      DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", 12), Row.of("Name5", 5));
>>
>>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(flinkEnv);
>>         Table table = tableEnv.fromDataStream(rows);
>>         table.printSchema();
>>
>>         rows.addSink(new PrintSinkFunction<>());
>>
>>         flinkEnv.execute();
>>
>>

Re: DataStream to Table API

Posted by Caizhi Weng <ts...@gmail.com>.
Hi!

This is because TypeExtractor#getMapReturnTypes are not dealing with row
types (see that method and also TypeExtractor#privateGetForClass). You
might want to open a JIRA ticket for this.

Matthias Broecheler <ma...@dataeng.ai> 于2021年8月20日周五 上午7:01写道:

> Hey Flinkers,
>
> I am trying to follow the docs
> <https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api> to
> convert a DataStream to a Table. Specifically, I have a DataStream of Row
> and want the columns of the row to become the columns of the resulting
> table.
>
> That works but only if I construct the Rows statically. If I construct
> them dynamically (in a map) then Flink turns the entire Row into one column
> of type "RAW('org.apache.flink.types.Row', '...')".
>
> Does anybody know why this is the case or how to fix it? Take a look at
> the simple Flink program below where I construct the DataStream "rows" in
> two different ways. I would expect those to be identical (and the sink does
> print identical information) but the inferred table schema is different.
>
> Thanks a ton,
> Matthias
>
> ------------------------------
>
>         StreamExecutionEnvironment flinkEnv = StreamExecutionEnvironment.getExecutionEnvironment();
>         flinkEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
>         DataStream<Integer> integers = flinkEnv.fromElements(12, 5);
>
>         DataStream<Row> rows = integers.map(i -> Row.of("Name"+i, i));
>
> //  This alternative way of constructing this data stream produces the expected table schema
> //      DataStream<Row> rows = flinkEnv.fromElements(Row.of("Name12", 12), Row.of("Name5", 5));
>
>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(flinkEnv);
>         Table table = tableEnv.fromDataStream(rows);
>         table.printSchema();
>
>         rows.addSink(new PrintSinkFunction<>());
>
>         flinkEnv.execute();
>
>