You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "frank wang (JIRA)" <ji...@apache.org> on 2018/11/12 10:09:00 UTC
[jira] [Created] (FLINK-10851) sqlUpdate support complex insert
grammar
frank wang created FLINK-10851:
----------------------------------
Summary: sqlUpdate support complex insert grammar
Key: FLINK-10851
URL: https://issues.apache.org/jira/browse/FLINK-10851
Project: Flink
Issue Type: Bug
Reporter: frank wang
my code is
{{tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, filedName2 from kafka.sdkafka.order_4");}}
but flink give me error info, said kafka "No table was registered under the name kafka"
i modify the code ,that is ok now
TableEnvironment.scala
{code:java}
def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
// parse the sql query
val parsed = planner.parse(stmt)
parsed match {
case insert: SqlInsert =>
// validate the SQL query
val query = insert.getSource
val validatedQuery = planner.validate(query)
// get query result as Table
val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel))
// get name of sink table
val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
// insert query result into sink table
insertInto(queryResult, targetTableName, config)
case _ =>
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
}
}
{code}
should modify to this
{code:java}
def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
// parse the sql query
val parsed = planner.parse(stmt)
parsed match {
case insert: SqlInsert =>
// validate the SQL query
val query = insert.getSource
val validatedQuery = planner.validate(query)
// get query result as Table
val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel))
// get name of sink table
//val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
val targetTableName = insert.getTargetTable.toString
// insert query result into sink table
insertInto(queryResult, targetTableName, config)
case _ =>
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
}
}
{code}
i hope this can be acceptted, thx
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)