You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "hehuiyuan (Jira)" <ji...@apache.org> on 2019/12/27 08:27:00 UTC

[jira] [Comment Edited] (FLINK-15404) How to insert hive table in streaming mode and blink planner

    [ https://issues.apache.org/jira/browse/FLINK-15404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17003994#comment-17003994 ] 

hehuiyuan edited comment on FLINK-15404 at 12/27/19 8:26 AM:
-------------------------------------------------------------

{code:java}
tableEnv.sqlUpdate("insert into myhive.`default`.stafftest select * from default_catalog.default_database.source_table");


//-------
org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:408) at com.flink.hive.BlinkStreamHiveTest.main(BlinkStreamHiveTest.java:130)
{code}
 

 

*Currently ,  the usage of `insert into hive's table` is not support in streaming mode.*
{code:java}
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

{code}
But if I use this hive table  as a  fixed  dimension table ,which is ok. If the data of hive table has changed, it is not found.
{code:java}
tableEnv.sqlUpdate("insert into default_catalog.default_database.sink_table select * from default_catalog.default_database.source_table  where age in (select age from myhive.`default`.staff)");


{code}
 

[~lzljs3620320] , is my understanding right for  Flink 1.9 branch?  


was (Author: hehuiyuan):
{code:java}

tableEnv.sqlUpdate("insert into myhive.`default`.stafftest select * from default_catalog.default_database.source_table");


//-------
org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:408) at com.flink.hive.BlinkStreamHiveTest.main(BlinkStreamHiveTest.java:130)
{code}
 

 

*Currently ,  the usage of `insert into hive's table` is not support in streaming mode.*
{code:java}
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

{code}
But if I use this hive table  as a  fixed  dimension table ,which is ok. If the data of hive table has changed, it is not found.
{code:java}
tableEnv.sqlUpdate("insert into default_catalog.default_database.sink_table select * from default_catalog.default_database.source_table  where age in (select age from myhive.`default`.staff)");


{code}
 

> How to insert hive table in streaming mode and blink planner
> ------------------------------------------------------------
>
>                 Key: FLINK-15404
>                 URL: https://issues.apache.org/jira/browse/FLINK-15404
>             Project: Flink
>          Issue Type: Wish
>          Components: Table SQL / Planner
>            Reporter: hehuiyuan
>            Priority: Major
>
> I have a hive catalog :
>  
> {code:java}
>     catalog name : myhive 
>     database : default
> {code}
>  
> and  the flink has a default catalog :     
>  
> {code:java}
>     catalog name : default_catalog
>     database : default_database
> {code}
>  
> For example :
> I have a source table 'source_table' that's from kafka   which is register to  default_catalog,
> I want to insert hive table 'hive_table' that is from myhive catalog.
> SQL:
> insert into hive_table select * from source_table;
>  
> And if if the data of hive table has changed, flink is not found. 
> SQL:
> tableEnv.sqlUpdate("insert into myhive.`default`.stafftest select * from default_catalog.default_database.source_table");



--
This message was sent by Atlassian Jira
(v8.3.4#803005)