You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Lin Li <li...@gmail.com> on 2017/07/31 04:50:09 UTC

[DISCUSS] api changes to support dml operations like ‘insert into’ clause in TableAPI&SQL

Hi everybody,
  I'd like to propose and discuss some api changes to support DML
operations like ‘insert into’ clause in TableAPI&SQL.
 Originally this was discussed with Fabian in the PR conversations(see
https://github.com/apache/flink/pull/3829),  considering it makes several
api changes, so starting this mailing list to discuss it.
# Motivation

Currently in TableAPI  there’s only registration method for source table,
 when we use SQL writing a streaming job, we should add additional code for
the sink, like TableAPI does:

val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"

val t = StreamTestData.getSmall3TupleDataStream(env)

tEnv.registerDataStream("MyTable", t)

// one way: invoke tableAPI’s writeToSink method directly

val result = tEnv.sql(sqlQuery)

result.writeToSink(new YourStreamSink)

// another way: convert to datastream first and then invoke addSink

val result = tEnv.sql(sqlQuery).toDataStream[Row]

result.addSink(new StreamITCase.StringSink)

From the api we can see the sink table always be a derived table because
its 'schema' is inferred from the result type of upstream query.

Compare to traditional RDBMS which support DML syntax, a query with a
target output could be written like this:

insert into table target_table_name

[(column_name [ ,...n ])]

query

The equivalent form of the example above is as follows:

   tEnv.registerTableSink("targetTable", new YourSink)

   val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"

   val result = tEnv.sql(sql)

It is supported by Calcite’s grammar:

insert:( INSERT | UPSERT ) INTO tablePrimary

[ '(' column [, column ]* ')' ]

query

I'd like to extend Flink TableAPI to support such feature.
# Proposed changes

1. support registering a sink table (like source table registration, and
will do validation according to the registered table)

/**

 * Registers an external [[TableSink]] in this [[TableEnvironment]]'s
catalog.

 * Registered sink tables can be referenced in SQL DML clause.

 *

 * @param name The name under which the [[TableSink]] is registered.

 * @param tableSink The [[TableSink]] to register.

 */

def registerTableSink(name: String, tableSink: TableSink[_]): Unit


2. add two new methods to table.scala

   -

   def insertInto[T](tableSink: String): Unit
   -

   def insertInto[T](tableSink: String, conf: QueryConfig): Unit

I propose to retain the current writeToSink method so that will not do a
breaking change of the API. And in a sense, it is similar with ‘SQL CREATE
TABLE AS statement’ usage in RDBMS(which creates a table from an existing
table by copying the existing table's columns).

3. deprecate the current sql method and add two new methods to
TableEnvironment

   -

   @deprecated def sql(sql: String): Table
   -

   def sqlQuery(sql: String): Table
   -

   def sqlUpdate(sql: String, config: QueryConfig): Unit

I think the method sqlUpdate here is different from Jdbc's[1] executeUpdate
which returns a int value, because sqlUpdate will not trigger an execution
immediately, so keep the return value as Unit sounds reasonable and doesn't
break down the consistency of Scala and Java APIs.

Note that:

A registered source table can not be update unless it registered as a sink
table as well. So we need to add validation both in TableAPI and SQL for
preventing query on sink table or insert into a source table.

Do not support partial column insertion to a target table due to it hadn’t
nullable property definition for now.

Ref:
[1]
https://docs.oracle.com/javase/7/docs/api/java/sql/PreparedStatement.html


doc link: https://goo.gl/n3phK5

What do you think?

Best, Lincoln

Re: [DISCUSS] api changes to support dml operations like ‘insert into’ clause in TableAPI&SQL

Posted by Shaoxuan Wang <ws...@gmail.com>.
+1 to support this change, as it makes the sql API more accurate and
elegant.
I hope this will not introduce too much troubles in the release upgrading
for the existing flink SQL users.


On Mon, Jul 31, 2017 at 3:42 PM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Lincoln,
>
> thank you for this proposal and discussing the motivation for this change.
> I think this makes a lot of sense (as you said, we discussed this before).
>
> I'd like to highlight the breaking change (among a several non-breaking
> changes) proposed here:
>
> We propose to deprecate TableEnvironment.sql(sql: String): Table and
> replace it by TableEnvironment.sqlQuery(sql: String): Table.
>
> The reasons for this change are:
> - We need a sqlUpdate() method that does not return a Table. For now the
> use case is "INSERT INTO x SELECT ..." but there are other DML statements
> as well.
> - In order to better distinguish query and updated functionality, we would
> like to rename the sql() method to sqlQuery().
> - We want to call the SQL method similar to their JDBC counterparts. In
> JDBC the methods are executeQuery(): ResultSet and executeUpdate(): int.
> Since the Table API is not only SQL, we think that sqlQuery() and
> sqlUpdate() are good method names for the functionality.
>
> What do others think?
>
> Fabian
>
>
>
> 2017-07-31 6:50 GMT+02:00 Lin Li <li...@gmail.com>:
>
> > Hi everybody,
> >   I'd like to propose and discuss some api changes to support DML
> > operations like ‘insert into’ clause in TableAPI&SQL.
> >  Originally this was discussed with Fabian in the PR conversations(see
> > https://github.com/apache/flink/pull/3829),  considering it makes
> several
> > api changes, so starting this mailing list to discuss it.
> > # Motivation
> >
> > Currently in TableAPI  there’s only registration method for source table,
> >  when we use SQL writing a streaming job, we should add additional code
> for
> > the sink, like TableAPI does:
> >
> > val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
> >
> > val t = StreamTestData.getSmall3TupleDataStream(env)
> >
> > tEnv.registerDataStream("MyTable", t)
> >
> > // one way: invoke tableAPI’s writeToSink method directly
> >
> > val result = tEnv.sql(sqlQuery)
> >
> > result.writeToSink(new YourStreamSink)
> >
> > // another way: convert to datastream first and then invoke addSink
> >
> > val result = tEnv.sql(sqlQuery).toDataStream[Row]
> >
> > result.addSink(new StreamITCase.StringSink)
> >
> > From the api we can see the sink table always be a derived table because
> > its 'schema' is inferred from the result type of upstream query.
> >
> > Compare to traditional RDBMS which support DML syntax, a query with a
> > target output could be written like this:
> >
> > insert into table target_table_name
> >
> > [(column_name [ ,...n ])]
> >
> > query
> >
> > The equivalent form of the example above is as follows:
> >
> >    tEnv.registerTableSink("targetTable", new YourSink)
> >
> >    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
> >
> >    val result = tEnv.sql(sql)
> >
> > It is supported by Calcite’s grammar:
> >
> > insert:( INSERT | UPSERT ) INTO tablePrimary
> >
> > [ '(' column [, column ]* ')' ]
> >
> > query
> >
> > I'd like to extend Flink TableAPI to support such feature.
> > # Proposed changes
> >
> > 1. support registering a sink table (like source table registration, and
> > will do validation according to the registered table)
> >
> > /**
> >
> >  * Registers an external [[TableSink]] in this [[TableEnvironment]]'s
> > catalog.
> >
> >  * Registered sink tables can be referenced in SQL DML clause.
> >
> >  *
> >
> >  * @param name The name under which the [[TableSink]] is registered.
> >
> >  * @param tableSink The [[TableSink]] to register.
> >
> >  */
> >
> > def registerTableSink(name: String, tableSink: TableSink[_]): Unit
> >
> >
> > 2. add two new methods to table.scala
> >
> >    -
> >
> >    def insertInto[T](tableSink: String): Unit
> >    -
> >
> >    def insertInto[T](tableSink: String, conf: QueryConfig): Unit
> >
> > I propose to retain the current writeToSink method so that will not do a
> > breaking change of the API. And in a sense, it is similar with ‘SQL
> CREATE
> > TABLE AS statement’ usage in RDBMS(which creates a table from an existing
> > table by copying the existing table's columns).
> >
> > 3. deprecate the current sql method and add two new methods to
> > TableEnvironment
> >
> >    -
> >
> >    @deprecated def sql(sql: String): Table
> >    -
> >
> >    def sqlQuery(sql: String): Table
> >    -
> >
> >    def sqlUpdate(sql: String, config: QueryConfig): Unit
> >
> > I think the method sqlUpdate here is different from Jdbc's[1]
> executeUpdate
> > which returns a int value, because sqlUpdate will not trigger an
> execution
> > immediately, so keep the return value as Unit sounds reasonable and
> doesn't
> > break down the consistency of Scala and Java APIs.
> >
> > Note that:
> >
> > A registered source table can not be update unless it registered as a
> sink
> > table as well. So we need to add validation both in TableAPI and SQL for
> > preventing query on sink table or insert into a source table.
> >
> > Do not support partial column insertion to a target table due to it
> hadn’t
> > nullable property definition for now.
> >
> > Ref:
> > [1]
> > https://docs.oracle.com/javase/7/docs/api/java/sql/
> PreparedStatement.html
> >
> >
> > doc link: https://goo.gl/n3phK5
> >
> > What do you think?
> >
> > Best, Lincoln
> >
>

Re: [DISCUSS] api changes to support dml operations like ‘insert into’ clause in TableAPI&SQL

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Lincoln,

thank you for this proposal and discussing the motivation for this change.
I think this makes a lot of sense (as you said, we discussed this before).

I'd like to highlight the breaking change (among a several non-breaking
changes) proposed here:

We propose to deprecate TableEnvironment.sql(sql: String): Table and
replace it by TableEnvironment.sqlQuery(sql: String): Table.

The reasons for this change are:
- We need a sqlUpdate() method that does not return a Table. For now the
use case is "INSERT INTO x SELECT ..." but there are other DML statements
as well.
- In order to better distinguish query and updated functionality, we would
like to rename the sql() method to sqlQuery().
- We want to call the SQL method similar to their JDBC counterparts. In
JDBC the methods are executeQuery(): ResultSet and executeUpdate(): int.
Since the Table API is not only SQL, we think that sqlQuery() and
sqlUpdate() are good method names for the functionality.

What do others think?

Fabian



2017-07-31 6:50 GMT+02:00 Lin Li <li...@gmail.com>:

> Hi everybody,
>   I'd like to propose and discuss some api changes to support DML
> operations like ‘insert into’ clause in TableAPI&SQL.
>  Originally this was discussed with Fabian in the PR conversations(see
> https://github.com/apache/flink/pull/3829),  considering it makes several
> api changes, so starting this mailing list to discuss it.
> # Motivation
>
> Currently in TableAPI  there’s only registration method for source table,
>  when we use SQL writing a streaming job, we should add additional code for
> the sink, like TableAPI does:
>
> val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
>
> val t = StreamTestData.getSmall3TupleDataStream(env)
>
> tEnv.registerDataStream("MyTable", t)
>
> // one way: invoke tableAPI’s writeToSink method directly
>
> val result = tEnv.sql(sqlQuery)
>
> result.writeToSink(new YourStreamSink)
>
> // another way: convert to datastream first and then invoke addSink
>
> val result = tEnv.sql(sqlQuery).toDataStream[Row]
>
> result.addSink(new StreamITCase.StringSink)
>
> From the api we can see the sink table always be a derived table because
> its 'schema' is inferred from the result type of upstream query.
>
> Compare to traditional RDBMS which support DML syntax, a query with a
> target output could be written like this:
>
> insert into table target_table_name
>
> [(column_name [ ,...n ])]
>
> query
>
> The equivalent form of the example above is as follows:
>
>    tEnv.registerTableSink("targetTable", new YourSink)
>
>    val sql = "INSERT INTO targetTable SELECT a, b, c FROM sourceTable"
>
>    val result = tEnv.sql(sql)
>
> It is supported by Calcite’s grammar:
>
> insert:( INSERT | UPSERT ) INTO tablePrimary
>
> [ '(' column [, column ]* ')' ]
>
> query
>
> I'd like to extend Flink TableAPI to support such feature.
> # Proposed changes
>
> 1. support registering a sink table (like source table registration, and
> will do validation according to the registered table)
>
> /**
>
>  * Registers an external [[TableSink]] in this [[TableEnvironment]]'s
> catalog.
>
>  * Registered sink tables can be referenced in SQL DML clause.
>
>  *
>
>  * @param name The name under which the [[TableSink]] is registered.
>
>  * @param tableSink The [[TableSink]] to register.
>
>  */
>
> def registerTableSink(name: String, tableSink: TableSink[_]): Unit
>
>
> 2. add two new methods to table.scala
>
>    -
>
>    def insertInto[T](tableSink: String): Unit
>    -
>
>    def insertInto[T](tableSink: String, conf: QueryConfig): Unit
>
> I propose to retain the current writeToSink method so that will not do a
> breaking change of the API. And in a sense, it is similar with ‘SQL CREATE
> TABLE AS statement’ usage in RDBMS(which creates a table from an existing
> table by copying the existing table's columns).
>
> 3. deprecate the current sql method and add two new methods to
> TableEnvironment
>
>    -
>
>    @deprecated def sql(sql: String): Table
>    -
>
>    def sqlQuery(sql: String): Table
>    -
>
>    def sqlUpdate(sql: String, config: QueryConfig): Unit
>
> I think the method sqlUpdate here is different from Jdbc's[1] executeUpdate
> which returns a int value, because sqlUpdate will not trigger an execution
> immediately, so keep the return value as Unit sounds reasonable and doesn't
> break down the consistency of Scala and Java APIs.
>
> Note that:
>
> A registered source table can not be update unless it registered as a sink
> table as well. So we need to add validation both in TableAPI and SQL for
> preventing query on sink table or insert into a source table.
>
> Do not support partial column insertion to a target table due to it hadn’t
> nullable property definition for now.
>
> Ref:
> [1]
> https://docs.oracle.com/javase/7/docs/api/java/sql/PreparedStatement.html
>
>
> doc link: https://goo.gl/n3phK5
>
> What do you think?
>
> Best, Lincoln
>