You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Caizhi Weng (Jira)" <ji...@apache.org> on 2020/02/10 05:02:00 UTC

[jira] [Comment Edited] (FLINK-14807) Add Table#collect api for fetching data to client

    [ https://issues.apache.org/jira/browse/FLINK-14807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17033373#comment-17033373 ] 

Caizhi Weng edited comment on FLINK-14807 at 2/10/20 5:01 AM:
--------------------------------------------------------------

Hi dear Flink community,

I'm digging this up because I have some thoughts on implementing this. I'll post my thoughts below and I'd love to hear your opinions.

From my understanding, this problem has two key points. One is that where to store the (possibly) never-ending results, and the other is that task managers can directly communicate with clients under some environments like k8s or yarn. For the never-ending results, back pressuring will work to limit the size of data in the whole cluster. For the communication between task managers and clients, job manager must be the man in the middle as clients and task managers are guaranteed to directly communicate with job manager.

As REST API is the main communication method between clients and job managers, I'm going to introduce two new internal REST API and a new sink to do the job. The new sink will only have 1 parallelism. The class names and API names below are just placeholders.

!table-collect.png|width=600!
 # When the sink is initialized in task managers, it will create a socket server for providing the query results. The IP and port of the socket server will be given to the REST server by a REST API call.
 # When client want to fetch a portion of the query result, it will provide the job id and a token to the REST server. Here token is an integer indicating the index of the first returning result. Note that token must be non-decreasing across several API calls for the same job.
 # REST server receives the API call and ask the socket server for results with a maximum size.
 # The socket server returns some results to the REST server.
 # REST server forwards the result to the client.

Some Q&As for the design above:
 * TM memories are protected by backpressuring. Do we have to introduce a new config option to set the maximum memory usage of the sink?
 >> Yes
 * What if the client disconnects?
 >> REST API is a non-stateful API so it will not suffer from disconnecting. With each API call the client must provide a token to the server indicating the index of the first result it would like to read.
  * What if the client does not connect?
 >> The job will not finish as the sink isn't finished.
  * What if the job restarts?
 >> For streaming jobs we have states to recover from so it's not a big problem. For batch jobs, as clients will provide tokens to the REST server, the REST server can just skip the results before the token and continue to work normally.
  * How to deal with retract / upsert streams?
 >> The return type will be Tuple2<Boolean, Row> where the first boolean value indicates this row is an appending row or a retracting row.
  * Is the 1st step necessary?
 >> Yes, because the port of the socket server is unknown before created.


was (Author: tsreaper):
Hi dear Flink community,

I'm digging this up because I have some thoughts on implementing this. I'll post my thoughts below and I'd love to hear your opinions.

From my understanding, this problem has two key points. One is that where to store the (possibly) never-ending results, and the other is that task managers can directly communicate with clients under some environments like k8s or yarn. For the never-ending results, back pressuring will work to limit the size of data in the whole cluster. For the communication between task managers and clients, job manager must be the man in the middle as clients and task managers are guaranteed to directly communicate with job manager.

As REST API is the main communication method between clients and job managers, I'm going to introduce two new internal REST API and a new sink to do the job. The new sink will only have 1 parallelism. The class names and API names below are just placeholders.

!table-collect.png|width=600!
 # When the sink is initialized in task managers, it will create a socket server for providing the query results. The IP and port of the socket server will be given to the REST server by a REST API call.
 # When client want to fetch a portion of the query result, it will provide the job id and a token to the REST server. Here token is an integer indicating the index of the first returning result. Note that token must be non-decreasing across several API calls for the same job.
 # REST server receives the API call and ask the socket server for results with a maximum size.
 # The socket server returns some results to the REST server.
 # REST server forwards the result to the client.

Some Q&As for the design above:
 * TM memories are protected by backpressuring. Do we have to introduce a new config option to set the maximum memory usage of the sink?
 >> Yes
 * What if the client disconnects?
 >> REST API is a non-stateful API so it will not suffer from disconnecting. With each API call the client must provide a token to the server indicating the index of the first result it would like to read.
  * What if the client does not connect?
 >> The job will not finish as the sink isn't finished.
  * What if the job restarts?
 >> For streaming jobs we have states to recover from so it's not a big problem. For batch jobs, as clients will provide tokens to the REST server, the REST server can just skip the results before the token and continue to work normally.
  * How to deal with retract / upsert streams?
 >> The return type will be Tuple2<Boolean, Row> where the first boolean value indicates this row is an appending row or a retracting row.

> Add Table#collect api for fetching data to client
> -------------------------------------------------
>
>                 Key: FLINK-14807
>                 URL: https://issues.apache.org/jira/browse/FLINK-14807
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API
>    Affects Versions: 1.9.1
>            Reporter: Jeff Zhang
>            Priority: Major
>              Labels: usability
>             Fix For: 1.11.0
>
>         Attachments: table-collect.png
>
>
> Currently, it is very unconvinient for user to fetch data of flink job unless specify sink expclitly and then fetch data from this sink via its api (e.g. write to hdfs sink, then read data from hdfs). However, most of time user just want to get the data and do whatever processing he want. So it is very necessary for flink to provide api Table#collect for this purpose. 
>  
> Other apis such as Table#head, Table#print is also helpful.  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)