You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by jy l <lj...@gmail.com> on 2020/11/23 02:28:30 UTC

FlinkSQL CDC 窗口分组聚合求助

Hi:
我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
[image: image.png]
[image: image.png]
分组计算的SQL如下:
[image: image.png]
在执行计算时,报了如下异常:
Exception in thread "main" org.apache.flink.table.api.TableException:
GroupWindowAggregate doesn't support consuming update and delete changes
which is produced by node TableSourceScan(table=[[default_catalog,
default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
orderInformationId, userId, categoryId, productId, price, productCount,
priceSum, shipAddress, receiverAddress])
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.immutable.Range.foreach(Range.scala:155)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)

我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
那面对我这样的情况,该用什么方案来解决?
望知道的各位告知一下,感谢!

祝好

Re: FlinkSQL CDC 窗口分组聚合求助

Posted by jy l <lj...@gmail.com>.
好的,我试一下。谢谢

Best

Jark Wu <im...@gmail.com> 于2020年11月23日周一 下午2:06写道:

> 那是不是用非窗口聚合,开5s mini batch,是不是可以达到你的需求?
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 13:16, jy l <lj...@gmail.com> wrote:
>
> > 使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。
> > 目前我们的系统大致架构是mysql(debezium)---->kafka--->flink---->es
> >
> > Jark Wu <im...@gmail.com> 于2020年11月23日周一 上午10:35写道:
> >
> > > Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
> > > 你可以使用非 window 聚合来代替。
> > >
> > > Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?
> > >
> > > Best,
> > > Jark
> > >
> > > On Mon, 23 Nov 2020 at 10:28, jy l <lj...@gmail.com> wrote:
> > >
> > > > Hi:
> > > > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
> > > > [image: image.png]
> > > > [image: image.png]
> > > > 分组计算的SQL如下:
> > > > [image: image.png]
> > > > 在执行计算时,报了如下异常:
> > > > Exception in thread "main" org.apache.flink.table.api.TableException:
> > > > GroupWindowAggregate doesn't support consuming update and delete
> > changes
> > > > which is produced by node TableSourceScan(table=[[default_catalog,
> > > > default_database, t_order,
> > watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
> > > > 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
> > > > orderInformationId, userId, categoryId, productId, price,
> productCount,
> > > > priceSum, shipAddress, receiverAddress])
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > > > at
> > > >
> > >
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > > > at scala.collection.immutable.Range.foreach(Range.scala:155)
> > > > at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> > > > at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> > > > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > > > at
> > > >
> > >
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > > >
> > > > 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
> > > > 那面对我这样的情况,该用什么方案来解决?
> > > > 望知道的各位告知一下,感谢!
> > > >
> > > > 祝好
> > > >
> > > >
> > >
> >
>

Re: FlinkSQL CDC 窗口分组聚合求助

Posted by Jark Wu <im...@gmail.com>.
那是不是用非窗口聚合,开5s mini batch,是不是可以达到你的需求?

Best,
Jark

On Mon, 23 Nov 2020 at 13:16, jy l <lj...@gmail.com> wrote:

> 使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。
> 目前我们的系统大致架构是mysql(debezium)---->kafka--->flink---->es
>
> Jark Wu <im...@gmail.com> 于2020年11月23日周一 上午10:35写道:
>
> > Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
> > 你可以使用非 window 聚合来代替。
> >
> > Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?
> >
> > Best,
> > Jark
> >
> > On Mon, 23 Nov 2020 at 10:28, jy l <lj...@gmail.com> wrote:
> >
> > > Hi:
> > > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
> > > [image: image.png]
> > > [image: image.png]
> > > 分组计算的SQL如下:
> > > [image: image.png]
> > > 在执行计算时,报了如下异常:
> > > Exception in thread "main" org.apache.flink.table.api.TableException:
> > > GroupWindowAggregate doesn't support consuming update and delete
> changes
> > > which is produced by node TableSourceScan(table=[[default_catalog,
> > > default_database, t_order,
> watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
> > > 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
> > > orderInformationId, userId, categoryId, productId, price, productCount,
> > > priceSum, shipAddress, receiverAddress])
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > > at
> > >
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > > at scala.collection.immutable.Range.foreach(Range.scala:155)
> > > at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> > > at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> > > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > > at
> > >
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > > at
> > >
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > >
> > > 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
> > > 那面对我这样的情况,该用什么方案来解决?
> > > 望知道的各位告知一下,感谢!
> > >
> > > 祝好
> > >
> > >
> >
>

Re: FlinkSQL CDC 窗口分组聚合求助

Posted by jy l <lj...@gmail.com>.
使用场景就是我们想用Flink 根据订单系统中的订单表,每5s钟计算一次总的交易金额。
目前我们的系统大致架构是mysql(debezium)---->kafka--->flink---->es

Jark Wu <im...@gmail.com> 于2020年11月23日周一 上午10:35写道:

> Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
> 你可以使用非 window 聚合来代替。
>
> Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 10:28, jy l <lj...@gmail.com> wrote:
>
> > Hi:
> > 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
> > [image: image.png]
> > [image: image.png]
> > 分组计算的SQL如下:
> > [image: image.png]
> > 在执行计算时,报了如下异常:
> > Exception in thread "main" org.apache.flink.table.api.TableException:
> > GroupWindowAggregate doesn't support consuming update and delete changes
> > which is produced by node TableSourceScan(table=[[default_catalog,
> > default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
> > 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
> > orderInformationId, userId, categoryId, productId, price, productCount,
> > priceSum, shipAddress, receiverAddress])
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > at
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> > at scala.collection.immutable.Range.foreach(Range.scala:155)
> > at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> > at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> > at
> >
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> > at
> >
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> >
> > 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
> > 那面对我这样的情况,该用什么方案来解决?
> > 望知道的各位告知一下,感谢!
> >
> > 祝好
> >
> >
>

Re: FlinkSQL CDC 窗口分组聚合求助

Posted by Jark Wu <im...@gmail.com>.
我建了个 issue 跟进这个功能:https://issues.apache.org/jira/browse/FLINK-20281

On Mon, 23 Nov 2020 at 10:35, Jark Wu <im...@gmail.com> wrote:

> Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
> 你可以使用非 window 聚合来代替。
>
> Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?
>
> Best,
> Jark
>
> On Mon, 23 Nov 2020 at 10:28, jy l <lj...@gmail.com> wrote:
>
>> Hi:
>> 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
>> [image: image.png]
>> [image: image.png]
>> 分组计算的SQL如下:
>> [image: image.png]
>> 在执行计算时,报了如下异常:
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> GroupWindowAggregate doesn't support consuming update and delete changes
>> which is produced by node TableSourceScan(table=[[default_catalog,
>> default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
>> 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
>> orderInformationId, userId, categoryId, productId, price, productCount,
>> priceSum, shipAddress, receiverAddress])
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> at scala.collection.immutable.Range.foreach(Range.scala:155)
>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
>> at
>> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>>
>> 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
>> 那面对我这样的情况,该用什么方案来解决?
>> 望知道的各位告知一下,感谢!
>>
>> 祝好
>>
>>

Re: FlinkSQL CDC 窗口分组聚合求助

Posted by Jark Wu <im...@gmail.com>.
Flink SQL 的 window agg 目前不支持输入含有更新和删除消息。
你可以使用非 window 聚合来代替。

Btw,你可能说一下你的需求场景么? 为什么需要在  CDC 上做 window 操作呢?

Best,
Jark

On Mon, 23 Nov 2020 at 10:28, jy l <lj...@gmail.com> wrote:

> Hi:
> 我使用CDC组件debezium将MySQL中的维表数据实时发送到kafka,然后使用flinkSQL消费kafka的数据,具体流程如下:
> [image: image.png]
> [image: image.png]
> 分组计算的SQL如下:
> [image: image.png]
> 在执行计算时,报了如下异常:
> Exception in thread "main" org.apache.flink.table.api.TableException:
> GroupWindowAggregate doesn't support consuming update and delete changes
> which is produced by node TableSourceScan(table=[[default_catalog,
> default_database, t_order, watermark=[-(TO_TIMESTAMP(FROM_UNIXTIME(/($1,
> 1000))), 3000:INTERVAL SECOND)]]], fields=[id, timestamps,
> orderInformationId, userId, categoryId, productId, price, productCount,
> priceSum, shipAddress, receiverAddress])
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:380)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:298)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.immutable.Range.foreach(Range.scala:155)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:325)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:275)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visitChild(FlinkChangelogModeInferenceProgram.scala:337)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1(FlinkChangelogModeInferenceProgram.scala:326)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.$anonfun$visitChildren$1$adapted(FlinkChangelogModeInferenceProgram.scala:325)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>
> 我的初步理解是FlinkSQL 不支持这样CDC这样由插入、更新、删除的流的分组聚合。
> 那面对我这样的情况,该用什么方案来解决?
> 望知道的各位告知一下,感谢!
>
> 祝好
>
>