You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "wind.fly.vip@outlook.com" <wi...@outlook.com> on 2020/07/13 03:46:02 UTC
flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark
Hi, all:
本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
insert into
x.report.bi_report_fence_common_indicators
select
fence_id,
'finishedOrderCnt' as indicator_name,
TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
count(1) as indicator_val
from
(
select
dt,
fence_id,
fence_coordinates_array,
c.driver_location
from
(
select
*
from
(
select
dt,
driver_location,
r1.f1.fence_info as fence_info
from
(
select
o.dt,
o.driver_location,
MD5(r.city_code) as k,
PROCTIME() as proctime
from
(
select
order_no,
dt,
driver_location,
PROCTIME() as proctime
from
x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
where
_type = 'insert'
and event_code = 'arriveAndSettlement'
) o
LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no
) o1
LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k
) a
where
fence_info is not null
) c
LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE
) as b
where
in_fence(fence_coordinates_array, driver_location)
group by
TUMBLE(dt, INTERVAL '5' MINUTE),
fence_id;
其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
_type STRING,
_old_id BIGINT,
id BIGINT,
_old_order_no STRING,
order_no STRING,
_old_event_code STRING,
event_code STRING,
_old_from_state TINYINT,
from_state TINYINT,
_old_to_state TINYINT,
to_state TINYINT,
_old_operator_type TINYINT,
operator_type TINYINT,
_old_passenger_location STRING,
passenger_location STRING,
_old_driver_location STRING,
driver_location STRING,
_old_trans_time STRING,
trans_time STRING,
_old_create_time STRING,
create_time STRING,
_old_update_time STRING,
update_time STRING,
_old_passenger_poi_address STRING,
passenger_poi_address STRING,
_old_passenger_detail_address STRING,
passenger_detail_address STRING,
_old_driver_poi_address STRING,
driver_poi_address STRING,
_old_driver_detail_address STRING,
driver_detail_address STRING,
_old_operator STRING,
operator STRING,
_old_partition_index TINYINT,
partition_index TINYINT,
dt as TO_TIMESTAMP(trans_time),
WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.properties.bootstrap.servers' = '*',
'connector.properties.zookeeper.connect' = '*',
'connector.version' = 'universal',
'format.type' = 'json',
'connector.properties.group.id' = 'testGroup',
'connector.startup-mode' = 'group-offsets',
'connector.topic' = 'xxxxx'
)
Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark
Posted by zilong xiao <ac...@gmail.com>.
topic是几个分区呢?如果是一个分区,要加一个rebalance参数吧?
wind.fly.vip@outlook.com <wi...@outlook.com> 于2020年7月13日周一 上午11:46写道:
> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka,
> 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session
> web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
> x.report.bi_report_fence_common_indicators
> select
> fence_id,
> 'finishedOrderCnt' as indicator_name,
> TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
> count(1) as indicator_val
> from
> (
> select
> dt,
> fence_id,
> fence_coordinates_array,
> c.driver_location
> from
> (
> select
> *
> from
> (
> select
> dt,
> driver_location,
> r1.f1.fence_info as fence_info
> from
> (
> select
> o.dt,
> o.driver_location,
> MD5(r.city_code) as k,
> PROCTIME() as proctime
> from
> (
> select
> order_no,
> dt,
> driver_location,
> PROCTIME() as proctime
> from
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
> where
> _type = 'insert'
> and event_code = 'arriveAndSettlement'
> ) o
> LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME
> AS OF o.proctime AS r ON r.order_no = o.order_no
> ) o1
> LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime
> AS r1 ON r1.k = o1.k
> ) a
> where
> fence_info is not null
> ) c
> LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id,
> fence_coordinates_array) ON TRUE
> ) as b
> where
> in_fence(fence_coordinates_array, driver_location)
> group by
> TUMBLE(dt, INTERVAL '5' MINUTE),
> fence_id;
> 其中
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
> CREATE TABLE
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
> _type STRING,
> _old_id BIGINT,
> id BIGINT,
> _old_order_no STRING,
> order_no STRING,
> _old_event_code STRING,
> event_code STRING,
> _old_from_state TINYINT,
> from_state TINYINT,
> _old_to_state TINYINT,
> to_state TINYINT,
> _old_operator_type TINYINT,
> operator_type TINYINT,
> _old_passenger_location STRING,
> passenger_location STRING,
> _old_driver_location STRING,
> driver_location STRING,
> _old_trans_time STRING,
> trans_time STRING,
> _old_create_time STRING,
> create_time STRING,
> _old_update_time STRING,
> update_time STRING,
> _old_passenger_poi_address STRING,
> passenger_poi_address STRING,
> _old_passenger_detail_address STRING,
> passenger_detail_address STRING,
> _old_driver_poi_address STRING,
> driver_poi_address STRING,
> _old_driver_detail_address STRING,
> driver_detail_address STRING,
> _old_operator STRING,
> operator STRING,
> _old_partition_index TINYINT,
> partition_index TINYINT,
> dt as TO_TIMESTAMP(trans_time),
> WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.properties.bootstrap.servers' = '*',
> 'connector.properties.zookeeper.connect' = '*',
> 'connector.version' = 'universal',
> 'format.type' = 'json',
> 'connector.properties.group.id' = 'testGroup',
> 'connector.startup-mode' = 'group-offsets',
> 'connector.topic' = 'xxxxx'
> )
>
回复: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark
Posted by "wind.fly.vip@outlook.com" <wi...@outlook.com>.
Hi,
确实是一共三个分区,只有一个分区有数据,已经解决,谢谢。
Best,
Junbao Zhang
________________________________
发件人: Leonard Xu <xb...@gmail.com>
发送时间: 2020年7月13日 11:57
收件人: user-zh <us...@flink.apache.org>
主题: Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark
Hi,
可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。
祝好,
Leonard Xu
> 在 2020年7月13日,11:46,wind.fly.vip@outlook.com 写道:
>
> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
> x.report.bi_report_fence_common_indicators
> select
> fence_id,
> 'finishedOrderCnt' as indicator_name,
> TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
> count(1) as indicator_val
> from
> (
> select
> dt,
> fence_id,
> fence_coordinates_array,
> c.driver_location
> from
> (
> select
> *
> from
> (
> select
> dt,
> driver_location,
> r1.f1.fence_info as fence_info
> from
> (
> select
> o.dt,
> o.driver_location,
> MD5(r.city_code) as k,
> PROCTIME() as proctime
> from
> (
> select
> order_no,
> dt,
> driver_location,
> PROCTIME() as proctime
> from
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
> where
> _type = 'insert'
> and event_code = 'arriveAndSettlement'
> ) o
> LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no
> ) o1
> LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k
> ) a
> where
> fence_info is not null
> ) c
> LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE
> ) as b
> where
> in_fence(fence_coordinates_array, driver_location)
> group by
> TUMBLE(dt, INTERVAL '5' MINUTE),
> fence_id;
> 其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
> CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
> _type STRING,
> _old_id BIGINT,
> id BIGINT,
> _old_order_no STRING,
> order_no STRING,
> _old_event_code STRING,
> event_code STRING,
> _old_from_state TINYINT,
> from_state TINYINT,
> _old_to_state TINYINT,
> to_state TINYINT,
> _old_operator_type TINYINT,
> operator_type TINYINT,
> _old_passenger_location STRING,
> passenger_location STRING,
> _old_driver_location STRING,
> driver_location STRING,
> _old_trans_time STRING,
> trans_time STRING,
> _old_create_time STRING,
> create_time STRING,
> _old_update_time STRING,
> update_time STRING,
> _old_passenger_poi_address STRING,
> passenger_poi_address STRING,
> _old_passenger_detail_address STRING,
> passenger_detail_address STRING,
> _old_driver_poi_address STRING,
> driver_poi_address STRING,
> _old_driver_detail_address STRING,
> driver_detail_address STRING,
> _old_operator STRING,
> operator STRING,
> _old_partition_index TINYINT,
> partition_index TINYINT,
> dt as TO_TIMESTAMP(trans_time),
> WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.properties.bootstrap.servers' = '*',
> 'connector.properties.zookeeper.connect' = '*',
> 'connector.version' = 'universal',
> 'format.type' = 'json',
> 'connector.properties.group.id' = 'testGroup',
> 'connector.startup-mode' = 'group-offsets',
> 'connector.topic' = 'xxxxx'
> )
Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark
Posted by Leonard Xu <xb...@gmail.com>.
Hi,
可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。
祝好,
Leonard Xu
> 在 2020年7月13日,11:46,wind.fly.vip@outlook.com 写道:
>
> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
> x.report.bi_report_fence_common_indicators
> select
> fence_id,
> 'finishedOrderCnt' as indicator_name,
> TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
> count(1) as indicator_val
> from
> (
> select
> dt,
> fence_id,
> fence_coordinates_array,
> c.driver_location
> from
> (
> select
> *
> from
> (
> select
> dt,
> driver_location,
> r1.f1.fence_info as fence_info
> from
> (
> select
> o.dt,
> o.driver_location,
> MD5(r.city_code) as k,
> PROCTIME() as proctime
> from
> (
> select
> order_no,
> dt,
> driver_location,
> PROCTIME() as proctime
> from
> x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
> where
> _type = 'insert'
> and event_code = 'arriveAndSettlement'
> ) o
> LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no
> ) o1
> LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k
> ) a
> where
> fence_info is not null
> ) c
> LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE
> ) as b
> where
> in_fence(fence_coordinates_array, driver_location)
> group by
> TUMBLE(dt, INTERVAL '5' MINUTE),
> fence_id;
> 其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
> CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
> _type STRING,
> _old_id BIGINT,
> id BIGINT,
> _old_order_no STRING,
> order_no STRING,
> _old_event_code STRING,
> event_code STRING,
> _old_from_state TINYINT,
> from_state TINYINT,
> _old_to_state TINYINT,
> to_state TINYINT,
> _old_operator_type TINYINT,
> operator_type TINYINT,
> _old_passenger_location STRING,
> passenger_location STRING,
> _old_driver_location STRING,
> driver_location STRING,
> _old_trans_time STRING,
> trans_time STRING,
> _old_create_time STRING,
> create_time STRING,
> _old_update_time STRING,
> update_time STRING,
> _old_passenger_poi_address STRING,
> passenger_poi_address STRING,
> _old_passenger_detail_address STRING,
> passenger_detail_address STRING,
> _old_driver_poi_address STRING,
> driver_poi_address STRING,
> _old_driver_detail_address STRING,
> driver_detail_address STRING,
> _old_operator STRING,
> operator STRING,
> _old_partition_index TINYINT,
> partition_index TINYINT,
> dt as TO_TIMESTAMP(trans_time),
> WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
> 'connector.type' = 'kafka',
> 'connector.properties.bootstrap.servers' = '*',
> 'connector.properties.zookeeper.connect' = '*',
> 'connector.version' = 'universal',
> 'format.type' = 'json',
> 'connector.properties.group.id' = 'testGroup',
> 'connector.startup-mode' = 'group-offsets',
> 'connector.topic' = 'xxxxx'
> )