You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Weijia Xu (Jira)" <ji...@apache.org> on 2021/01/25 08:22:00 UTC

[jira] [Commented] (FLINK-20947) Idle source doesn't work when pushing watermark into the source

    [ https://issues.apache.org/jira/browse/FLINK-20947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17271146#comment-17271146 ] 

Weijia Xu commented on FLINK-20947:
-----------------------------------

Hi [~jark] & [~fsk119], just applied the latest fix and it worked as what we expected. Thanks for your effort. 

> Idle source doesn't work when pushing watermark into the source
> ---------------------------------------------------------------
>
>                 Key: FLINK-20947
>                 URL: https://issues.apache.org/jira/browse/FLINK-20947
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.0
>            Reporter: Weijia Xu
>            Assignee: Shengkai Fang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.13.0, 1.12.2
>
>
> I use table sql to create stream with kafka source, and write data from Kafka into a Hive partitioned table.
> The following sql to create kafka table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>     "CREATE TABLE stream_tmp.kafka_tmp (`messageId` STRING, `message_type` STRING,`payload` STRING,`timestamp` BIGINT, " +
>             "  procTime AS PROCTIME()," +
>             "  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 1000,'yyyy-MM-dd HH:mm:ss'))," +
>             "  WATERMARK FOR eventTime AS eventTime - INTERVAL '15' SECOND )" +
>             "  WITH ('connector' = 'kafka'," +
>             " 'topic' = 'XXX-topic'," +
>             " 'properties.bootstrap.servers'='kafka-server:9092'," +
>             " 'properties.group.id' = 'XXX-group_id'," +
>             " 'scan.startup.mode' = 'latest-offset'," +
>             " 'format' = 'json'," +
>             " 'json.fail-on-missing-field' = 'false'," +
>             " 'json.ignore-parse-errors' = 'true' )"
>             );{code}
>   
> And, the following sql to create Hive table:
> {code:java}
> // code placeholder
> tableEnv.executeSql(
>      "CREATE TABLE hive_tmp.kafka_hive_tmp (`message_id` STRING,`message_type` STRING,`payload` STRING,`event_ts` BIGINT ) " +
>              " PARTITIONED BY (ts_date STRING,ts_hour STRING, ts_minute STRING)" +
>              " STORED AS PARQUET TBLPROPERTIES (" +
>              " 'sink.partition-commit.trigger' = 'partition-time'," +
>              " 'sink.partition-commit.delay' = '1 min'," +
>              " 'sink.partition-commit.policy.kind'='metastore,success-file'," +
>              " 'sink.partition-commit.success-file.name'='_SUCCESS'," +
>              " 'partition.time-extractor.timestamp-pattern' = '$ts_date $ts_hour:$ts_minute:00')");
> {code}
>  
> For the kafka topic used above,  which has multi partitions,  with some of the partitions there's data coming in, while other partitions with no data coming.
> I noticed that there's config "_table.exec.source.idle-timeout_" can handle the situation for the "idle" source partition, but event though set this config, it still cannot trigger the Hive partition commit (that means the "_SUCCESS" file will not be created for the partition).
> After do the analysis for this issue, find the root cause is that the watermark for the "idle" partition will not advance, which cause the Hive partition cannot be committed.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)