You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/12 09:46:19 UTC

[GitHub] yiduwangkai opened a new pull request #7080: flink sqlUpdate support complex insert

yiduwangkai opened a new pull request #7080: flink sqlUpdate support complex insert
URL: https://github.com/apache/flink/pull/7080
 
 
   my code is 
   `tableEnv.sqlUpdate("insert into kafka.sdkafka.product_4 select filedName1, filedName2 from kafka.sdkafka.order_4");`
   
   but flink give me error info, said kafka "No table was registered under the name kafka"
   i modify the code ,that is ok now
   TableEnvironment.scala
   ` def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
       val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
       // parse the sql query
       val parsed = planner.parse(stmt)
       parsed match {
         case insert: SqlInsert =>
           // validate the SQL query
           val query = insert.getSource
           val validatedQuery = planner.validate(query)
   
           // get query result as Table
           val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel))
   
           // get name of sink table
           val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
   
           // insert query result into sink table
           insertInto(queryResult, targetTableName, config)
         case _ =>
           throw new TableException(
             "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
       }
     }`
   
   should modify to this
   
   `def sqlUpdate(stmt: String, config: QueryConfig): Unit = {
       val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
       // parse the sql query
       val parsed = planner.parse(stmt)
       parsed match {
         case insert: SqlInsert =>
           // validate the SQL query
           val query = insert.getSource
           val validatedQuery = planner.validate(query)
   
           // get query result as Table
           val queryResult = new Table(this, LogicalRelNode(planner.rel(validatedQuery).rel))
   
           // get name of sink table
           //val targetTableName = insert.getTargetTable.asInstanceOf[SqlIdentifier].names.get(0)
           val targetTableName = insert.getTargetTable.toString
   
           // insert query result into sink table
           insertInto(queryResult, targetTableName, config)
         case _ =>
           throw new TableException(
             "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
       }
     }`
   
   
   i hope this can be acceptted, thx

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services