You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "zhaorui_9303@163.com" <zh...@163.com> on 2019/12/26 04:03:10 UTC

source并行度不同导致任务没有数据落地

hi all:
       最近碰到一个很头疼的事情,两个任务相同的sql语句不同的source,任务的并行度为8,一个source是kafka一个source是rabbitmq,kafka和rabbitmq中加载相同的数据后,source为rabbitmq的任务有数据落地,source为kafka的任务运行好几次都不见有数据落地。因为sql中涉及到了窗口,所以考虑过kafka多partition对数据读取顺序的影响,将所有数据都加载到kafka的同一个partition中重启任务后发现还是没有数据落地。考虑到这两个任务唯一的不同点就是源为rabbitmq的任务source算子的并行度为1,所以将源为kafka的任务的source并行度也设为1,运行任务后发现有数据落地了。source并行度的改变应该只是改变了一下source与其它算子之间的数据传递方式,这种改变会对最终的结果造成影响吗?有没有大佬碰到过相同的问题?
    flink版本1.9.1
    sql:select count(ps_comment) col1,ceil(stddev_pop(ps_availqty)) col2,
       tumble_start(over_time,interval '72' hour) col3,
       tumble_end(over_time,interval '72' hour) col4,
       ps_date
from cirrostream_kafka_ck_source_03_8x3
where ps_availqty <= 489
  and ps_supplycost > 998
  and ps_comment not like '%ff%'
  and ps_partkey <= 3751122
   or ps_suppkey = 723
group by ps_date,ps_availqty,tumble(over_time,interval '72' hour)
having min(ps_partkey) not in (3525711,3738707,3740245)



zhaorui_9303@163.com

Re: source并行度不同导致任务没有数据落地

Posted by JingsongLee <lz...@aliyun.com.INVALID>.
Hi zhaorui,

你是不是指定了Rowtime的列?
如果指定了,Kafka是否有的通道一直没有数据或者数据没有前进?

Window的输出触发是需要watermark前进的,这也就需要你的每个通道都有数据在时间上前进,也就是说每个Kafka的通道都需要有最新时间点的数据源源不断的来。

你设置成一个并发,那就只要一个通道有数据就可以了,所以绕过了这个问题。

Best,
Jingsong Lee


------------------------------------------------------------------
From:zhaorui_9303@163.com <zh...@163.com>
Send Time:2019年12月26日(星期四) 12:03
To:user-zh <us...@flink.apache.org>
Subject:source并行度不同导致任务没有数据落地

hi all:
       最近碰到一个很头疼的事情,两个任务相同的sql语句不同的source,任务的并行度为8,一个source是kafka一个source是rabbitmq,kafka和rabbitmq中加载相同的数据后,source为rabbitmq的任务有数据落地,source为kafka的任务运行好几次都不见有数据落地。因为sql中涉及到了窗口,所以考虑过kafka多partition对数据读取顺序的影响,将所有数据都加载到kafka的同一个partition中重启任务后发现还是没有数据落地。考虑到这两个任务唯一的不同点就是源为rabbitmq的任务source算子的并行度为1,所以将源为kafka的任务的source并行度也设为1,运行任务后发现有数据落地了。source并行度的改变应该只是改变了一下source与其它算子之间的数据传递方式,这种改变会对最终的结果造成影响吗?有没有大佬碰到过相同的问题?
    flink版本1.9.1
    sql:select count(ps_comment) col1,ceil(stddev_pop(ps_availqty)) col2,
       tumble_start(over_time,interval '72' hour) col3,
       tumble_end(over_time,interval '72' hour) col4,
       ps_date
from cirrostream_kafka_ck_source_03_8x3
where ps_availqty <= 489
  and ps_supplycost > 998
  and ps_comment not like '%ff%'
  and ps_partkey <= 3751122
   or ps_suppkey = 723
group by ps_date,ps_availqty,tumble(over_time,interval '72' hour)
having min(ps_partkey) not in (3525711,3738707,3740245)



zhaorui_9303@163.com