You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "hehuiyuan (Jira)" <ji...@apache.org> on 2019/12/27 08:17:00 UTC
[jira] [Comment Edited] (FLINK-15404) How to insert hive table for
different catalog
[ https://issues.apache.org/jira/browse/FLINK-15404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17003994#comment-17003994 ]
hehuiyuan edited comment on FLINK-15404 at 12/27/19 8:16 AM:
-------------------------------------------------------------
{code:java}
tableEnv.sqlUpdate("insert into myhive.`default`.stafftest select * from default_catalog.default_database.source_table");
//-------
org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:408) at com.flink.hive.BlinkStreamHiveTest.main(BlinkStreamHiveTest.java:130)
{code}
*Currently , the usage of `insert into hive's table` is not support in streaming mode.*
{code:java}
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
{code}
But if I use this hive table as a fixed dimension table ,which is ok. If the data of hive table has changed, it is not found.
{code:java}
tableEnv.sqlUpdate("insert into default_catalog.default_database.sink_table select * from default_catalog.default_database.source_table where age in (select age from myhive.`default`.staff)");
{code}
was (Author: hehuiyuan):
{code:java}
org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink. at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:408) at com.flink.hive.BlinkStreamHiveTest.main(BlinkStreamHiveTest.java:130)
{code}
*Currently , the usage of `insert into hive's table` is not support in streaming mode.*
{code:java}
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
{code}
But if I use this hive table as a fixed dimension table ,which is ok. If the data of hive table has changed, it is not found.
{code:java}
tableEnv.sqlUpdate("insert into default_catalog.default_database.sink_table select * from default_catalog.default_database.source_table where age in (select age from myhive.`default`.staff)");
{code}
> How to insert hive table for different catalog
> -----------------------------------------------
>
> Key: FLINK-15404
> URL: https://issues.apache.org/jira/browse/FLINK-15404
> Project: Flink
> Issue Type: Wish
> Components: Table SQL / Planner
> Reporter: hehuiyuan
> Priority: Major
>
> I have a hive catalog :
>
> {code:java}
> catalog name : myhive
> database : default
> {code}
>
> and the flink has a default catalog :
>
> {code:java}
> catalog name : default_catalog
> database : default_database
> {code}
>
> For example :
> I have a source table 'source_table' that's from kafka which is register to default_catalog,
> I want to insert hive table 'hive_table' that is from myhive catalog.
> SQL:
> insert into hive_table select * from source_table;
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)