You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "frank wang (JIRA)" <ji...@apache.org> on 2018/11/12 10:09:00 UTC

[jira] [Created] (FLINK-10851) sqlUpdate support complex insert grammar

frank wang created FLINK-10851:
----------------------------------

             Summary: sqlUpdate support complex insert grammar
                 Key: FLINK-10851
                 URL: https://issues.apache.org/jira/browse/FLINK-10851
             Project: Flink
          Issue Type: Bug
            Reporter: frank wang


my code is
{{tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, filedName2 from kafka.sdkafka.order_4");}}

but flink give me error info, said kafka "No table was registered under the name kafka"
i modify the code ,that is ok now
TableEnvironment.scala


{code:java}
def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
  val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
  // parse the sql query
  val parsed = planner.parse(stmt)
  parsed match {
    case insert: SqlInsert =>
      // validate the SQL query
      val query = insert.getSource
      val validatedQuery = planner.validate(query)

      // get query result as Table
      val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel))

      // get name of sink table
      val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)

      // insert query result into sink table
      insertInto(queryResult, targetTableName, config)
    case _ =>
      throw new TableException(
        "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
  }
}
{code}
should modify to this
{code:java}
def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
  val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
  // parse the sql query
  val parsed = planner.parse(stmt)
  parsed match {
    case insert: SqlInsert =>
      // validate the SQL query
      val query = insert.getSource
      val validatedQuery = planner.validate(query)

      // get query result as Table
      val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel))

      // get name of sink table
      //val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
      val targetTableName = insert.getTargetTable.toString

      // insert query result into sink table
      insertInto(queryResult, targetTableName, config)
    case _ =>
      throw new TableException(
        "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
  }
}
{code}
 

i hope this can be acceptted, thx



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)