You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/05/05 06:42:04 UTC

[jira] [Commented] (FLINK-6442) Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

    [ https://issues.apache.org/jira/browse/FLINK-6442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15997851#comment-15997851 ] 

ASF GitHub Bot commented on FLINK-6442:
---------------------------------------

GitHub user lincoln-lil opened a pull request:

    https://github.com/apache/flink/pull/3829

    [FLINK-6442] [table] Extend TableAPI Support Sink Table Registration …

    Support Sink Table Registration and ‘insert into’ Clause in SQL:
    1. support registering a sink table (like source table registration, and will do validation according to the registered table)
    2. support ‘insert into’ clause in SQL
    3. do not support insert partial columns of the target table due to table columns haven’t nullable property definition for now
    
    Tried to implement translating the whole SQL string via converter rules, while in Flink,  sink operation has no output but all RelNodes' translation should return a DataSet/DataStream, and Calcite's TableModify defines output for DML(INSERT,DELETE,UPDATE) is a RowCount column of BigInt type.
    So separate source translation if DML insert,  this a workaround since we have no conclusion for the output type definition of sink operation(compare to traditional SQL insert).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/lincoln-lil/flink FLINK-6442

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3829.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3829
    
----
commit 4aec1f6d6167ab7f0be9ace3d9f51a877d21ef46
Author: lincoln-lil <li...@gmail.com>
Date:   2017-05-04T09:52:34Z

    [FLINK-6442] [table] Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL

----


> Extend TableAPI Support Sink Table Registration and ‘insert into’ Clause in SQL
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-6442
>                 URL: https://issues.apache.org/jira/browse/FLINK-6442
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: lincoln.lee
>            Assignee: lincoln.lee
>            Priority: Minor
>
> Currently in TableAPI  there’s only registration method for source table,  when we use SQL writing a streaming job, we should add additional part for the sink, like TableAPI does:
> {code}
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> val t = StreamTestData.getSmall3TupleDataStream(env)
> tEnv.registerDataStream("MyTable", t)
> // one way: invoke tableAPI’s writeToSink method directly
> val result = tEnv.sql(sqlQuery)
> result.writeToSink(new YourStreamSink)
> // another way: convert to datastream first and then invoke addSink 
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
> result.addSink(new StreamITCase.StringSink)
> {code}
> From the api we can see the sink table always be a derived table because its 'schema' is inferred from the result type of upstream query.
> Compare to traditional RDBMS which support DML syntax, a query with a target output could be written like this:
> {code}
> insert into table target_table_name
> [(column_name [ ,...n ])]
> query
> {code}
> The equivalent form of the example above is as follows:
> {code}
>     tEnv.registerTableSink("targetTable", new YourSink)
>     val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
>     val result = tEnv.sql(sql)
> {code}
> It is supported by Calcite’s grammar: 
> {code}
>  insert:( INSERT | UPSERT ) INTO tablePrimary
>  [ '(' column [, column ]* ')' ]
>  query
> {code}
> I'd like to extend Flink TableAPI to support such feature.  see design doc: https://goo.gl/n3phK5



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)