You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "wind.fly.vip@outlook.com" <wi...@outlook.com> on 2020/07/13 06:59:19 UTC
回复: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark
Hi,
确实是一共三个分区,只有一个分区有数据,已经解决,谢谢。
Best,
Junbao Zhang
________________________________
发件人: Leonard Xu <xb...@gmail.com>
发送时间: 2020年7月13日 11:57
收件人: user-zh <us...@flink.apache.org>
主题: Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark
Hi,
可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。
祝好,
Leonard Xu
> 在 2020年7月13日,11:46,wind.fly.vip@outlook.com 写道:
>
> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
> x.report.bi_report_fence_common_indicators
> select
> fence_id,
> 'finishedOrderCnt' as indicator_name,
> TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
> count(1) as indicator_val
> from
> (
> select
> dt,
> fence_id,
> fence_coordinates_array,
> c.driver_location
> from
> (
> select
> *
> from
> (
> select
> dt,
> driver_location,
> r1.f1.fence_info as fence_info
> from
> (
> select
> o.dt,
> o.driver_location,
> MD5(r.city_code) as k,
> PROCTIME() as proctime
> from
> (
> select
> order_no,
> dt,
> driver_location,
> PROCTIME() as proctime
> from
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
> where
> _type = 'insert'
> and event_code = 'arriveAndSettlement'
> ) o
> LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no
> ) o1
> LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k
> ) a
> where
> fence_info is not null
> ) c
> LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE
> ) as b
> where
> in_fence(fence_coordinates_array, driver_location)
> group by
> TUMBLE(dt, INTERVAL '5' MINUTE),
> fence_id;
> 其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
> CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
> _type STRING,
> _old_id BIGINT,
> id BIGINT,
> _old_order_no STRING,
> order_no STRING,
> _old_event_code STRING,
> event_code STRING,
> _old_from_state TINYINT,
> from_state TINYINT,
> _old_to_state TINYINT,
> to_state TINYINT,
> _old_operator_type TINYINT,
> operator_type TINYINT,
> _old_passenger_location STRING,
> passenger_location STRING,
> _old_driver_location STRING,
> driver_location STRING,
> _old_trans_time STRING,
> trans_time STRING,
> _old_create_time STRING,
> create_time STRING,
> _old_update_time STRING,
> update_time STRING,
> _old_passenger_poi_address STRING,
> passenger_poi_address STRING,
> _old_passenger_detail_address STRING,
> passenger_detail_address STRING,
> _old_driver_poi_address STRING,
> driver_poi_address STRING,
> _old_driver_detail_address STRING,
> driver_detail_address STRING,
> _old_operator STRING,
> operator STRING,
> _old_partition_index TINYINT,
> partition_index TINYINT,
> dt as TO_TIMESTAMP(trans_time),
> WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.properties.bootstrap.servers' = '*',
> 'connector.properties.zookeeper.connect' = '*',
> 'connector.version' = 'universal',
> 'format.type' = 'json',
> 'connector.properties.group.id' = 'testGroup',
> 'connector.startup-mode' = 'group-offsets',
> 'connector.topic' = 'xxxxx'
> )