You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Luo Jason <lu...@outlook.com> on 2020/12/13 05:57:16 UTC

Is there any way to directly convert org.apache.flink.table.api.Table into my POJO.

Hello, I'm new to Flink. Thank you for your help.

My application scenario is to process the log through the Flink program, and finally store the log in HBase.


Through Kafka, my Flink application receives log information from other systems. This information can not be immediately sent to  HBASE. I first store these logs into the flink table. After new logs are received, the associated logs will be selected from the link table for calculation. According to the calculation results, they will be stored in HBase or continue to be put into the Flink table.

My problem is that when I use SQL statements to query the data structure from the flink table as org.apache.flink.table.api.Table From Flink In the document, the method I learned is to use the org.apache.flink.util.CloseableIterator The iterator of < row > loops through each row to obtain the corresponding field by position. But this is too troublesome. Is there any way to directly convert a Table into my business POJO.

In addition, whether there is a way to insert POJO into the link table, I do not seem to see a suitable method.

thanks.

Json
12.13

Re: Is there any way to directly convert org.apache.flink.table.api.Table into my POJO.

Posted by Timo Walther <tw...@apache.org>.
Hi,

first, we should clarify "continue to be put into the Flink table": A 
Flink Table object does not physically store the data. It is basically a 
view that contains a transformation pipeline.

When you are calling `collect()` the pipeline is executed and all 
results from the cluster are streamed to one local machine (this might 
be a bottleneck when processing large data). It might reveal a design 
issue in your pipeline because ideally all logic should be expressed in 
Flink SQL or a DataStream API transformations.

In general, Flink SQL comes with basic structured type support. A 
structured type is basically a business POJO. Starting from Flink 1.11, 
a structured type can be created and passed through UDFs. However, 
connectors and collect() cannot return them yet. If you really don't 
want to implement conversion logic yourself, you can also take a look at 
internal converters:
org.apache.flink.table.data.conversion.DataStructureConverters
In theory, you can convert from Row -> RowData -> POJO.

I hope this helps.

Regards,
Timo


On 13.12.20 06:57, Luo Jason wrote:
> Hello, I'm new to Flink. Thank you for your help.
> 
> My application scenario is to process the log through the Flink program, 
> and finally store the log in HBase.
> 
> 
> Through Kafka, my Flink application receives log information from other 
> systems. This information can not be immediately sent to  HBASE. I first 
> store these logs into the flink table. After new logs are received, the 
> associated logs will be selected from the link table for calculation. 
> According to the calculation results, they will be stored in HBase or 
> continue to be put into the Flink table.
> 
> My problem is that when I use SQL statements to query the data structure 
> from the flink table as org.apache.flink.table.api.Table From Flink In 
> the document, the method I learned is to use the 
> org.apache.flink.util.CloseableIterator The iterator of < row > loops 
> through each row to obtain the corresponding field by position. But this 
> is too troublesome. Is there any way to directly convert a Table into my 
> business POJO.
> 
> In addition, whether there is a way to insert POJO into the link table, 
> I do not seem to see a suitable method.
> 
> thanks.
> 
> Json
> 12.13