You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by Jie Wong <> on 2021/01/21 08:29:11 UTC

flink sql升级到1.12 基于eventtime的window聚合没有watermark导致没有数据输出

String ddl =
        "CREATE TABLE orders (\n"
                + "  user_id INT,\n"
                + "  product STRING,\n"
                + "  amount INT,\n"
                + "  `time` bigint,\n"
                + "  `ts` AS TO_TIMESTAMP(FROM_UNIXTIME(`time`)),\n"
                + "  WATERMARK FOR ts AS ts - INTERVAL '3' SECOND\n"
                + ") WITH (\n"
                + "    'connector' = 'kafka',\n"
                + "    'topic' = 'test',\n"
                + "    '' = 'flink_ods1',\n"
                + "    'properties.bootstrap.servers' = ‘‘\n"
                + "    'format' = 'csv',\n"
                + "    'csv.ignore-parse-errors' = 'true',\n"
                + "    'csv.field-delimiter' = ',',\n"
                + "    'scan.startup.mode' = 'group-offsets'\n"
                + ")";

String ddl2 = "CREATE TABLE print_check2 (\n"
        + "    `window_start` STRING,\n"
        + "    `order_num` BIGINT,\n"
        + "    `total_amount` BIGINT,\n"
        + "    `unique_products` BIGINT\n"
        + ") WITH (\n"
        + "    'connector' = 'print'\n"
        + ")";

String ddl3 =
        "insert into print_check2 SELECT\n"
                + "  CAST(TUMBLE_START(ts, INTERVAL '5' SECOND) AS
STRING) window_start,\n"
                + "  COUNT(*) order_num,\n"
                + "  SUM(amount) total_amount,\n"
                + "  COUNT(DISTINCT product) unique_products\n"
                + "FROM orders\n"
                + "GROUP BY TUMBLE(ts, INTERVAL '5' SECOND)";
