You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "wind.fly.vip@outlook.com" <wi...@outlook.com> on 2020/07/13 06:59:19 UTC

回复: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

Hi,
    确实是一共三个分区,只有一个分区有数据,已经解决,谢谢。

Best,
Junbao Zhang
________________________________
发件人: Leonard Xu <xb...@gmail.com>
发送时间: 2020年7月13日 11:57
收件人: user-zh <us...@flink.apache.org>
主题: Re: flink1.10.1 flink sql消费kafka当parallelism大于1时不产生watermark

Hi,

可以先看下 Kakfa topic 对应的partition有几个?是否每个分区都有数据。

祝好,
Leonard Xu

> 在 2020年7月13日,11:46,wind.fly.vip@outlook.com 写道:
>
> Hi, all:
> 本人使用的flink版本为flink 1.10.1, flink sql消费kafka, 当parallelism为1时正常运行,但讲parallelism修改为2时,在yarn-session web页面看不到watermark的指标信息了,也没有计算结果输出,sql如下:
> insert into
>  x.report.bi_report_fence_common_indicators
> select
>  fence_id,
>  'finishedOrderCnt' as indicator_name,
>  TUMBLE_END(dt, INTERVAL '5' MINUTE) as ts,
>  count(1) as indicator_val
> from
>  (
>    select
>      dt,
>      fence_id,
>      fence_coordinates_array,
>      c.driver_location
>    from
>      (
>        select
>          *
>        from
>          (
>            select
>              dt,
>              driver_location,
>              r1.f1.fence_info as fence_info
>            from
>              (
>                select
>                  o.dt,
>                  o.driver_location,
>                  MD5(r.city_code) as k,
>                  PROCTIME() as proctime
>                from
>                  (
>                    select
>                      order_no,
>                      dt,
>                      driver_location,
>                      PROCTIME() as proctime
>                    from
>                      x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner
>                    where
>                      _type = 'insert'
>                      and event_code = 'arriveAndSettlement'
>                  ) o
>                  LEFT JOIN x.dim.saic_trip_create_t_order FOR SYSTEM_TIME AS OF o.proctime AS r ON r.order_no = o.order_no
>              ) o1
>              LEFT JOIN x.dim.fence_info FOR SYSTEM_TIME AS OF o1.proctime AS r1 ON r1.k = o1.k
>          ) a
>        where
>          fence_info is not null
>      ) c
>      LEFT JOIN LATERAL TABLE(fence_split(c.fence_info)) as T(fence_id, fence_coordinates_array) ON TRUE
>  ) as b
> where
>  in_fence(fence_coordinates_array, driver_location)
> group by
>  TUMBLE(dt, INTERVAL '5' MINUTE),
>  fence_id;
>           其中 x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner表中dt为watermark字段,建表语句如下:
>           CREATE TABLE x.ods.ods_binlog_saic_trip_order2_t_order_trans_inner(
>  _type STRING,
>  _old_id BIGINT,
>  id BIGINT,
>  _old_order_no STRING,
>  order_no STRING,
>  _old_event_code STRING,
>  event_code STRING,
>  _old_from_state TINYINT,
>  from_state TINYINT,
>  _old_to_state TINYINT,
>  to_state TINYINT,
>  _old_operator_type TINYINT,
>  operator_type TINYINT,
>  _old_passenger_location STRING,
>  passenger_location STRING,
>  _old_driver_location STRING,
>  driver_location STRING,
>  _old_trans_time STRING,
>  trans_time STRING,
>  _old_create_time STRING,
>  create_time STRING,
>  _old_update_time STRING,
>  update_time STRING,
>  _old_passenger_poi_address STRING,
>  passenger_poi_address STRING,
>  _old_passenger_detail_address STRING,
>  passenger_detail_address STRING,
>  _old_driver_poi_address STRING,
>  driver_poi_address STRING,
>  _old_driver_detail_address STRING,
>  driver_detail_address STRING,
>  _old_operator STRING,
>  operator STRING,
>  _old_partition_index TINYINT,
>  partition_index TINYINT,
>  dt as TO_TIMESTAMP(trans_time),
>  WATERMARK FOR dt AS dt - INTERVAL '5' SECOND
> ) WITH (
>  'connector.type' = 'kafka',
>  'connector.properties.bootstrap.servers' = '*',
>  'connector.properties.zookeeper.connect' = '*',
>  'connector.version' = 'universal',
>  'format.type' = 'json',
>  'connector.properties.group.id' = 'testGroup',
>  'connector.startup-mode' = 'group-offsets',
>  'connector.topic' = 'xxxxx'
> )