You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/12 09:46:19 UTC
[GitHub] yiduwangkai opened a new pull request #7080: flink sqlUpdate
support complex insert
yiduwangkai opened a new pull request #7080: flink sqlUpdate support complex insert
URL: https://github.com/apache/flink/pull/7080
my code is
`tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, filedName2 from kafka.sdkafka.order_4");`
but flink give me error info, said kafka "No table was registered under the name kafka"
i modify the code ,that is ok now
TableEnvironment.scala
` def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
// parse the sql query
val parsed = planner.parse(stmt)
parsed match {
case insert: SqlInsert =>
// validate the SQL query
val query = insert.getSource
val validatedQuery = planner.validate(query)
// get query result as Table
val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel))
// get name of sink table
val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
// insert query result into sink table
insertInto(queryResult, targetTableName, config)
case _ =>
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
}
}`
should modify to this
`def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
// parse the sql query
val parsed = planner.parse(stmt)
parsed match {
case insert: SqlInsert =>
// validate the SQL query
val query = insert.getSource
val validatedQuery = planner.validate(query)
// get query result as Table
val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel))
// get name of sink table
//val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
val targetTableName = insert.getTargetTable.toString
// insert query result into sink table
insertInto(queryResult, targetTableName, config)
case _ =>
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
}
}`
i hope this can be acceptted, thx
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services