You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Hequn Cheng (JIRA)" <ji...@apache.org> on 2018/11/12 12:45:00 UTC
[jira] [Commented] (FLINK-10851) sqlUpdate support complex insert
grammar
[ https://issues.apache.org/jira/browse/FLINK-10851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16683743#comment-16683743 ]
Hequn Cheng commented on FLINK-10851:
-------------------------------------
[~frank wang] Hi, thanks for creating this jira and bring up this discussion.
You should write sql like this:
{{tableEnv.sqlUpdate("insert into `kafka.sdkafka.product_4` select filedName1, filedName2 from `kafka.sdkafka.order_4`");}} with the Backticks: {{``}}
> sqlUpdate support complex insert grammar
> ----------------------------------------
>
> Key: FLINK-10851
> URL: https://issues.apache.org/jira/browse/FLINK-10851
> Project: Flink
> Issue Type: Bug
> Reporter: frank wang
> Priority: Major
> Labels: pull-request-available
>
> my code is
> {{tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, filedName2 from kafka.sdkafka.order_4");}}
> but flink give me error info, said kafka "No table was registered under the name kafka"
> i modify the code ,that is ok now
> TableEnvironment.scala
> {code:java}
> def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
> val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
> // parse the sql query
> val parsed = planner.parse(stmt)
> parsed match {
> case insert: SqlInsert =>
> // validate the SQL query
> val query = insert.getSource
> val validatedQuery = planner.validate(query)
> // get query result as Table
> val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel))
> // get name of sink table
> val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
> // insert query result into sink table
> insertInto(queryResult, targetTableName, config)
> case _ =>
> throw new TableException(
> "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
> }
> }
> {code}
> should modify to this
> {code:java}
> def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
> val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
> // parse the sql query
> val parsed = planner.parse(stmt)
> parsed match {
> case insert: SqlInsert =>
> // validate the SQL query
> val query = insert.getSource
> val validatedQuery = planner.validate(query)
> // get query result as Table
> val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel))
> // get name of sink table
> //val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
> val targetTableName = insert.getTargetTable.toString
> // insert query result into sink table
> insertInto(queryResult, targetTableName, config)
> case _ =>
> throw new TableException(
> "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
> }
> }
> {code}
>
> i hope this can be acceptted, thx
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)