You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by sunfulin <su...@163.com> on 2020/04/01 06:31:41 UTC

Flink实时写入hive异常

Hi, 
我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
cc  @Jingsong Li  @Jark Wu 




org.apache.flink.table.api.TableException: Stream Tables can only be emitted by AppendStreamTableSink, RetractStreamTableSink, or UpsertStreamTableSink.

 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)

 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)

 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)

 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)

 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)

 at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)

 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

 at scala.collection.Iterator.foreach(Iterator.scala:937)

 at scala.collection.Iterator.foreach$(Iterator.scala:937)

 at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)

 at scala.collection.IterableLike.foreach(IterableLike.scala:70)

 at scala.collection.IterableLike.foreach$(IterableLike.scala:69)

 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

 at scala.collection.TraversableLike.map(TraversableLike.scala:233)

 at scala.collection.TraversableLike.map$(TraversableLike.scala:226)

 at scala.collection.AbstractTraversable.map(Traversable.scala:104)

 at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)

 at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)

 at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)

 at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)

 at com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)

 at com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j

Re: Flink实时写入hive异常

Posted by Jingsong Li <ji...@gmail.com>.
是的,有关的,这个umbrella issue就是FLIP-115.

Best,
Jingsong Lee

On Wed, Apr 1, 2020 at 10:57 PM 叶贤勋 <yx...@163.com> wrote:

> Hi jingsong,
> 我看这个issue[1] 你提了关于支持hive streaming sink的两个pr,这部分代码是否跟Flip-115相关?
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-14255
>
>
> | |
> 叶贤勋
> |
> |
> yxx_cmhd@163.com
> |
> 签名由网易邮箱大师定制
>
>
> 在2020年04月1日 16:28,111<xi...@163.com> 写道:
> Hi jingsong,
> 那厉害了,相当于Flink内部做了一个数据湖的插件了。
> Best,
> Xinghalo



-- 
Best, Jingsong Lee

回复: Flink实时写入hive异常

Posted by 叶贤勋 <yx...@163.com>.
Hi jingsong,
我看这个issue[1] 你提了关于支持hive streaming sink的两个pr,这部分代码是否跟Flip-115相关?


[1] https://issues.apache.org/jira/browse/FLINK-14255


| |
叶贤勋
|
|
yxx_cmhd@163.com
|
签名由网易邮箱大师定制


在2020年04月1日 16:28,111<xi...@163.com> 写道:
Hi jingsong,
那厉害了,相当于Flink内部做了一个数据湖的插件了。
Best,
Xinghalo

回复: Flink实时写入hive异常

Posted by 111 <xi...@163.com>.
Hi jingsong,
那厉害了,相当于Flink内部做了一个数据湖的插件了。
Best,
Xinghalo

Re: Flink实时写入hive异常

Posted by Jingsong Li <ji...@gmail.com>.
Hi 111,

虽然数据湖可以扩展一些事情,但是流写Hive也是Hive数仓重要的一环。

文件数的问题:
- 取决于checkpoint间隔,如果checkpoint间隔内,能写到128MB的文件,对HDFS来说就是很合适的文件大小了。
- 流写,也可以引入files compact等功能,FLIP-115里面也有讨论。

Best,
Jingsong Lee

On Wed, Apr 1, 2020 at 4:06 PM 111 <xi...@163.com> wrote:

>
>
> Hi,
> 流写入hive,其实是属于数据湖的概念范畴。
> 因为流往hive里面写,会造成很多的碎片文件,对hdfs造成性能影响,因此一般不会在流场景下直接写入hive。
> 详细的可以了解 Delta lake 或 hudi。
>
>
> 在2020年04月1日 15:05,sunfulin<su...@163.com> 写道:
> Hi,
> 场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
> 我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
> OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-01 15:01:32,"Jingsong Li" <ji...@gmail.com> 写道:
>
> Hi,
>
>
> Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。
>
>
> 你可以描述下详细堆栈、应用场景、SQL吗?
>
>
> Best,
> Jingsong Lee
>
>
> On Wed, Apr 1, 2020 at 2:56 PM sunfulin <su...@163.com> wrote:
>
>
>
>
>
> 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh
>
>
>
> org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not
> enough rules to produce a node with desired properties
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-01 14:49:41,"Jingsong Li" <ji...@gmail.com> 写道:
> Hi,
>
> 异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
> Best,
> Jingsong Lee
>
> On Wed, Apr 1, 2020 at 2:32 PM sunfulin <su...@163.com> wrote:
>
> Hi,
> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
> cc  @Jingsong Li  @Jark Wu
>
>
>
>
> org.apache.flink.table.api.TableException: Stream Tables can only be
> emitted by AppendStreamTableSink, RetractStreamTableSink, or
> UpsertStreamTableSink.
>
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
> at scala.collection.Iterator.foreach(Iterator.scala:937)
>
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>
> at
>
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>
> at
>
> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>
> at
>
> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>
>
>
> --
> Best, Jingsong Lee
>
>
>
>
>
>
>
>
>
>
>
> --
>
> Best, Jingsong Lee



-- 
Best, Jingsong Lee

Re:回复: Flink实时写入hive异常

Posted by sunfulin <su...@163.com>.





hi,
请教下,现有功能里,可以将hive表作为维表join么?作为temporal table。
如果可以的话,hive分区表如何join呢?一般维表join是要join最新分区的全量数据。











在 2020-04-02 17:30:39,"111" <xi...@163.com> 写道:
>Hi,
>只要能解决upsert问题带来的存储碎片、读写冲突、版本回溯,实时写入Hive也是可以的,目前spark delta lake就已经做到了。
>前面jingsong也提到过,会去解决文件存储、合并等问题,那到时候flink实时写入hive就没问题了。
>Best,
>Xinghalo

回复: Flink实时写入hive异常

Posted by 111 <xi...@163.com>.
Hi,
只要能解决upsert问题带来的存储碎片、读写冲突、版本回溯,实时写入Hive也是可以的,目前spark delta lake就已经做到了。
前面jingsong也提到过,会去解决文件存储、合并等问题,那到时候flink实时写入hive就没问题了。
Best,
Xinghalo

Re: Flink实时写入hive异常

Posted by Leonard Xu <xb...@gmail.com>.
看起来stream -> (Flink SQL) -> hive 这个场景大家都很关注,预计FLIP115完成后能解这个场景。

Best,
Leonard

> 在 2020年4月2日,17:10,sunfulin <su...@163.com> 写道:
> 
> 
> 
> 
> Hi,
> 这里就涉及到一个话题,应该怎么去实践实时和离线数仓的数据融合。我能理解这个技术上的不合理,但是通过Flink实现数仓ETL的功能,我相信很多使用Flink的会将之作为一个重要场景。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-04-01 16:05:54,"111" <xi...@163.com> 写道:
>> 
>> 
>> Hi,
>> 流写入hive,其实是属于数据湖的概念范畴。
>> 因为流往hive里面写,会造成很多的碎片文件,对hdfs造成性能影响,因此一般不会在流场景下直接写入hive。
>> 详细的可以了解 Delta lake 或 hudi。
>> 
>> 
>> 在2020年04月1日 15:05,sunfulin<su...@163.com> 写道:
>> Hi,
>> 场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
>> 我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
>> OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-04-01 15:01:32,"Jingsong Li" <ji...@gmail.com> 写道:
>> 
>> Hi,
>> 
>> 
>> Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。
>> 
>> 
>> 你可以描述下详细堆栈、应用场景、SQL吗?
>> 
>> 
>> Best,
>> Jingsong Lee
>> 
>> 
>> On Wed, Apr 1, 2020 at 2:56 PM sunfulin <su...@163.com> wrote:
>> 
>> 
>> 
>> 
>> 
>> 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh
>> 
>> 
>> 
>> org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-04-01 14:49:41,"Jingsong Li" <ji...@gmail.com> 写道:
>> Hi,
>> 
>> 异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>> 
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>> 
>> Best,
>> Jingsong Lee
>> 
>> On Wed, Apr 1, 2020 at 2:32 PM sunfulin <su...@163.com> wrote:
>> 
>> Hi,
>> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>> cc  @Jingsong Li  @Jark Wu
>> 
>> 
>> 
>> 
>> org.apache.flink.table.api.TableException: Stream Tables can only be
>> emitted by AppendStreamTableSink, RetractStreamTableSink, or
>> UpsertStreamTableSink.
>> 
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>> 
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> 
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>> 
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>> 
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> 
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>> 
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> 
>> at scala.collection.Iterator.foreach(Iterator.scala:937)
>> 
>> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> 
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> 
>> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> 
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>> 
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> 
>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> 
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> 
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> 
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> 
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> 
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>> 
>> at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> 
>> at
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>> 
>> at
>> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>> 
>> 
>> 
>> --
>> Best, Jingsong Lee
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> 
>> Best, Jingsong Lee


Re:回复: Flink实时写入hive异常

Posted by sunfulin <su...@163.com>.


Hi,
这里就涉及到一个话题,应该怎么去实践实时和离线数仓的数据融合。我能理解这个技术上的不合理,但是通过Flink实现数仓ETL的功能,我相信很多使用Flink的会将之作为一个重要场景。














在 2020-04-01 16:05:54,"111" <xi...@163.com> 写道:
>
>
>Hi,
>流写入hive,其实是属于数据湖的概念范畴。
>因为流往hive里面写,会造成很多的碎片文件,对hdfs造成性能影响,因此一般不会在流场景下直接写入hive。
>详细的可以了解 Delta lake 或 hudi。
>
>
>在2020年04月1日 15:05,sunfulin<su...@163.com> 写道:
>Hi,
>场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
>我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
>OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-04-01 15:01:32,"Jingsong Li" <ji...@gmail.com> 写道:
>
>Hi,
>
>
>Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。
>
>
>你可以描述下详细堆栈、应用场景、SQL吗?
>
>
>Best,
>Jingsong Lee
>
>
>On Wed, Apr 1, 2020 at 2:56 PM sunfulin <su...@163.com> wrote:
>
>
>
>
>
>我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh
>
>
>
>org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-04-01 14:49:41,"Jingsong Li" <ji...@gmail.com> 写道:
>Hi,
>
>异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
>Best,
>Jingsong Lee
>
>On Wed, Apr 1, 2020 at 2:32 PM sunfulin <su...@163.com> wrote:
>
>Hi,
>我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>cc  @Jingsong Li  @Jark Wu
>
>
>
>
>org.apache.flink.table.api.TableException: Stream Tables can only be
>emitted by AppendStreamTableSink, RetractStreamTableSink, or
>UpsertStreamTableSink.
>
>at
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>
>at
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>
>at
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>at
>org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>at
>org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>
>at
>org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>
>at
>scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
>at scala.collection.Iterator.foreach(Iterator.scala:937)
>
>at scala.collection.Iterator.foreach$(Iterator.scala:937)
>
>at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>
>at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>
>at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>
>at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>at
>org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>
>at
>org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>
>at
>org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>
>at
>org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>
>at
>com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>
>at
>com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>
>
>
>--
>Best, Jingsong Lee
>
>
>
>
>
>
>
>
>
>
>
>--
>
>Best, Jingsong Lee

回复: Flink实时写入hive异常

Posted by 111 <xi...@163.com>.

Hi,
流写入hive,其实是属于数据湖的概念范畴。
因为流往hive里面写,会造成很多的碎片文件,对hdfs造成性能影响,因此一般不会在流场景下直接写入hive。
详细的可以了解 Delta lake 或 hudi。


在2020年04月1日 15:05,sunfulin<su...@163.com> 写道:
Hi,
场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
















在 2020-04-01 15:01:32,"Jingsong Li" <ji...@gmail.com> 写道:

Hi,


Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。


你可以描述下详细堆栈、应用场景、SQL吗?


Best,
Jingsong Lee


On Wed, Apr 1, 2020 at 2:56 PM sunfulin <su...@163.com> wrote:





我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh



org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties














在 2020-04-01 14:49:41,"Jingsong Li" <ji...@gmail.com> 写道:
Hi,

异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee

On Wed, Apr 1, 2020 at 2:32 PM sunfulin <su...@163.com> wrote:

Hi,
我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
cc  @Jingsong Li  @Jark Wu




org.apache.flink.table.api.TableException: Stream Tables can only be
emitted by AppendStreamTableSink, RetractStreamTableSink, or
UpsertStreamTableSink.

at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)

at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)

at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)

at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)

at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)

at
org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)

at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

at scala.collection.Iterator.foreach(Iterator.scala:937)

at scala.collection.Iterator.foreach$(Iterator.scala:937)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)

at scala.collection.IterableLike.foreach(IterableLike.scala:70)

at scala.collection.IterableLike.foreach$(IterableLike.scala:69)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at scala.collection.TraversableLike.map(TraversableLike.scala:233)

at scala.collection.TraversableLike.map$(TraversableLike.scala:226)

at scala.collection.AbstractTraversable.map(Traversable.scala:104)

at
org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)

at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)

at
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)

at
com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)

at
com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j



--
Best, Jingsong Lee











--

Best, Jingsong Lee

Re: Re: Re: Flink实时写入hive异常

Posted by Jingsong Li <ji...@gmail.com>.
不幸的是,FlinkSQL的确一直不支持。。
是的,这是1.11的重要目标之一。

Best,
Jingsong Lee

On Wed, Apr 1, 2020 at 3:05 PM sunfulin <su...@163.com> wrote:

> Hi,
> 场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
> 我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
> OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
>
>
>
>
>
>
> 在 2020-04-01 15:01:32,"Jingsong Li" <ji...@gmail.com> 写道:
>
> Hi,
>
> Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。
>
> 你可以描述下详细堆栈、应用场景、SQL吗?
>
> Best,
> Jingsong Lee
>
> On Wed, Apr 1, 2020 at 2:56 PM sunfulin <su...@163.com> wrote:
>
>>
>> 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh
>>
>> org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not
>> enough rules to produce a node with desired properties
>>
>>
>>
>>
>>
>>
>> 在 2020-04-01 14:49:41,"Jingsong Li" <ji...@gmail.com> 写道:
>> >Hi,
>> >
>> >异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>> >
>> >[1]
>> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>> >
>> >Best,
>> >Jingsong Lee
>> >
>> >On Wed, Apr 1, 2020 at 2:32 PM sunfulin <su...@163.com> wrote:
>> >
>> >> Hi,
>> >> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>> >> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>> >> cc  @Jingsong Li  @Jark Wu
>> >>
>> >>
>> >>
>> >>
>> >> org.apache.flink.table.api.TableException: Stream Tables can only be
>> >> emitted by AppendStreamTableSink, RetractStreamTableSink, or
>> >> UpsertStreamTableSink.
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>> >>
>> >>  at
>> >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> >>
>> >>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>> >>
>> >>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> >>
>> >>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> >>
>> >>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> >>
>> >>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>> >>
>> >>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> >>
>> >>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> >>
>> >>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> >>
>> >>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> >>
>> >>  at
>> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> >>
>> >>  at
>> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>> >>
>> >>  at
>> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>> >>
>> >>  at
>> >> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>> >>
>> >>  at
>> >> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>> >
>> >
>> >
>> >--
>> >Best, Jingsong Lee
>>
>>
>>
>>
>>
>
>
> --
> Best, Jingsong Lee
>
>
>
>
>


-- 
Best, Jingsong Lee

Re:Re: Re: Flink实时写入hive异常

Posted by sunfulin <su...@163.com>.
Hi,
场景其实很简单,就是通过Flink实时将kafka数据做个同步到hive。hive里创建了分区表。
我感觉这个场景很常见吧。之前以为是支持的,毕竟可以在通过hivecatalog创建kafka table。但是创建了不能写,有点不合理。
OK吧。想问下FLIP-115计划是在哪个release版本支持哈?1.11么?
















在 2020-04-01 15:01:32,"Jingsong Li" <ji...@gmail.com> 写道:

Hi,


Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。


你可以描述下详细堆栈、应用场景、SQL吗?


Best,
Jingsong Lee


On Wed, Apr 1, 2020 at 2:56 PM sunfulin <su...@163.com> wrote:





我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh



org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties














在 2020-04-01 14:49:41,"Jingsong Li" <ji...@gmail.com> 写道:
>Hi,
>
>异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
>Best,
>Jingsong Lee
>
>On Wed, Apr 1, 2020 at 2:32 PM sunfulin <su...@163.com> wrote:
>
>> Hi,
>> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>> cc  @Jingsong Li  @Jark Wu
>>
>>
>>
>>
>> org.apache.flink.table.api.TableException: Stream Tables can only be
>> emitted by AppendStreamTableSink, RetractStreamTableSink, or
>> UpsertStreamTableSink.
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>>
>>  at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>>
>>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>>
>>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>>
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>>
>>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>>
>>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>>
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>
>>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>>
>>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>>
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>
>>  at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>
>
>
>-- 
>Best, Jingsong Lee





 





--

Best, Jingsong Lee

Re: Re: Flink实时写入hive异常

Posted by Jingsong Li <ji...@gmail.com>.
Hi,

Batch模式来支持Kafka -> Hive,也是不推荐的哦,FLIP-115后才可以在streaming模式支持这类场景。

你可以描述下详细堆栈、应用场景、SQL吗?

Best,
Jingsong Lee

On Wed, Apr 1, 2020 at 2:56 PM sunfulin <su...@163.com> wrote:

>
> 我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh
>
> org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not
> enough rules to produce a node with desired properties
>
>
>
>
>
>
> 在 2020-04-01 14:49:41,"Jingsong Li" <ji...@gmail.com> 写道:
> >Hi,
> >
> >异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
> >
> >[1]
> >https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
> >
> >Best,
> >Jingsong Lee
> >
> >On Wed, Apr 1, 2020 at 2:32 PM sunfulin <su...@163.com> wrote:
> >
> >> Hi,
> >> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
> >> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
> >> cc  @Jingsong Li  @Jark Wu
> >>
> >>
> >>
> >>
> >> org.apache.flink.table.api.TableException: Stream Tables can only be
> >> emitted by AppendStreamTableSink, RetractStreamTableSink, or
> >> UpsertStreamTableSink.
> >>
> >>  at
> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
> >>
> >>  at
> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> >>
> >>  at
> >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
> >>
> >>  at
> >> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
> >>
> >>  at
> >> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> >>
> >>  at
> >> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
> >>
> >>  at
> >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> >>
> >>  at scala.collection.Iterator.foreach(Iterator.scala:937)
> >>
> >>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
> >>
> >>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> >>
> >>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> >>
> >>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> >>
> >>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> >>
> >>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> >>
> >>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> >>
> >>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> >>
> >>  at
> >> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> >>
> >>  at
> >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> >>
> >>  at
> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
> >>
> >>  at
> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
> >>
> >>  at
> >> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
> >>
> >>  at
> >> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
> >
> >
> >
> >--
> >Best, Jingsong Lee
>
>
>
>
>


-- 
Best, Jingsong Lee

Re:Re: Flink实时写入hive异常

Posted by sunfulin <su...@163.com>.


我使用batch mode时,又抛出了如下异常:感觉一步一个坑。。sigh



org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties














在 2020-04-01 14:49:41,"Jingsong Li" <ji...@gmail.com> 写道:
>Hi,
>
>异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]
>
>[1]
>https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table
>
>Best,
>Jingsong Lee
>
>On Wed, Apr 1, 2020 at 2:32 PM sunfulin <su...@163.com> wrote:
>
>> Hi,
>> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
>> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
>> cc  @Jingsong Li  @Jark Wu
>>
>>
>>
>>
>> org.apache.flink.table.api.TableException: Stream Tables can only be
>> emitted by AppendStreamTableSink, RetractStreamTableSink, or
>> UpsertStreamTableSink.
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>>
>>  at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>>
>>  at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>>
>>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>>
>>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>>
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>>
>>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>>
>>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>>
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>
>>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>>
>>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>>
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>
>>  at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>
>>  at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>>
>>  at
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>>
>>  at
>> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j
>
>
>
>-- 
>Best, Jingsong Lee

Re: Flink实时写入hive异常

Posted by Jingsong Li <ji...@gmail.com>.
Hi,

异常的意思是现在hive sink还不支持streaming模式,只能用于batch模式中。功能正在开发中[1]

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table

Best,
Jingsong Lee

On Wed, Apr 1, 2020 at 2:32 PM sunfulin <su...@163.com> wrote:

> Hi,
> 我这边在使用Flink消费Kafka数据写入hive。配置连接都OK,但是在实际执行insert into
> xxx_table时,报了如下异常。这个看不懂啥原因,求大神指教。
> cc  @Jingsong Li  @Jark Wu
>
>
>
>
> org.apache.flink.table.api.TableException: Stream Tables can only be
> emitted by AppendStreamTableSink, RetractStreamTableSink, or
> UpsertStreamTableSink.
>
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:136)
>
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan(ExecNode.scala:58)
>
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode.translateToPlan$(ExecNode.scala:56)
>
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:60)
>
>  at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
>  at scala.collection.Iterator.foreach(Iterator.scala:937)
>
>  at scala.collection.Iterator.foreach$(Iterator.scala:937)
>
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>
>  at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>
>  at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>
>  at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>
>  at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>
>  at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>
>  at
> com.htsc.crm_realtime.fatjob.Core.TableLoader.sqlUpdate(TableLoader.java:87)
>
>  at
> com.htsc.crm_realtime.fatjob.Jobs.hive.SensorData2Hive.doJob(SensorData2Hive.j



-- 
Best, Jingsong Lee