You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lu Weizheng <lu...@hotmail.com> on 2020/03/03 02:08:46 UTC

Table API connect method timestamp watermark assignment problem

Hey guys,

I am using Flink Table API recently. I want to use EventTime and use a Kakfa Table Connector. I think in my code Flink cannot recognize event time timestamp field. Here is my code :

public static void main(String[] args) throws Exception {

        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        tEnv
            // 使用connect函数连接外部系统
            .connect(
                new Kafka()
                .version("universal")     // 必填,合法的参数有"0.8", "0.9", "0.10", "0.11"或"universal"
                .topic("user_behavior")   // 必填,Topic名
                .startFromLatest()        // 首次消费时数据读取的位置
                .property("zookeeper.connect", "localhost:2181")  // Kafka连接参数
                .property("bootstrap.servers", "localhost:9092")
            )
            // 序列化方式 可以是JSON、Avro等
            .withFormat(new Json())
            // 数据的Schema
            .withSchema(
                new Schema()
                    .field("user_id", DataTypes.BIGINT())
                    .field("item_id", DataTypes.BIGINT())
                    .field("category_id", DataTypes.BIGINT())
                    .field("behavior", DataTypes.STRING())
                    .field("ts", DataTypes.TIMESTAMP(3))
                    .rowtime(new Rowtime().timestampsFromField("ts").watermarksPeriodicAscending())
            )
            // 临时表的表名,后续可以在SQL语句中使用这个表名
            .createTemporaryTable("user_behavior");

        Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" +
                "\tuser_id, \n" +
                "\tCOUNT(behavior) AS behavior_cnt, \n" +
                "\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" +
                "FROM user_behavior\n" +
                "GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)");
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(tumbleGroupByUserId, Row.class);
        result.print();

        env.execute("table api");
    }

As shown in the code above, I use rowtime() method when I want to define a Schema. When I try to run, I get the following error: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

​I tried another method based on a DLL, and it worked. So it is not my Kafka source problem.

tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
                "    user_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    category_id BIGINT,\n" +
                "    behavior STRING,\n" +
                "    ts TIMESTAMP(3),\n" +
//                "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
                "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
                "    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本\n" +
                "    'connector.topic' = 'user_behavior',  -- kafka topic\n" +
                "    'connector.startup-mode' = 'latest-offset',  -- 从起始 offset 开始读取\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址\n" +
                "    'format.type' = 'json'  -- 数据源格式为 json\n" +
                ")");

Hope anyone can give some suggestions. Thanks.

回复: Table API connect method timestamp watermark assignment problem

Posted by Lu Weizheng <lu...@hotmail.com>.
Thanks a lot, hope it will be fixed soon!
________________________________
发件人: Jark Wu <im...@gmail.com>
发送时间: 2020年3月3日 11:25
收件人: Lu Weizheng <lu...@hotmail.com>
抄送: user@flink.apache.org <us...@flink.apache.org>
主题: Re: Table API connect method timestamp watermark assignment problem

Hi Lu,

DDL and Schema descriptor do not share the same code path. I guess the reason why Schema descriptor doesn't work is because of FLINK-16160.
We will fix that in the next minor release. Please use DDL to define watermark which is also the suggested way to do that.
The current Schema descriptor will be refactored to share the same code path of DDL in the near future.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-16160

On Tue, 3 Mar 2020 at 10:09, Lu Weizheng <lu...@hotmail.com>> wrote:
Hey guys,

I am using Flink Table API recently. I want to use EventTime and use a Kakfa Table Connector. I think in my code Flink cannot recognize event time timestamp field. Here is my code :

public static void main(String[] args) throws Exception {

        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        tEnv
            // 使用connect函数连接外部系统
            .connect(
                new Kafka()
                .version("universal")     // 必填,合法的参数有"0.8", "0.9", "0.10", "0.11"或"universal"
                .topic("user_behavior")   // 必填,Topic名
                .startFromLatest()        // 首次消费时数据读取的位置
                .property("zookeeper.connect", "localhost:2181")  // Kafka连接参数
                .property("bootstrap.servers", "localhost:9092")
            )
            // 序列化方式 可以是JSON、Avro等
            .withFormat(new Json())
            // 数据的Schema
            .withSchema(
                new Schema()
                    .field("user_id", DataTypes.BIGINT())
                    .field("item_id", DataTypes.BIGINT())
                    .field("category_id", DataTypes.BIGINT())
                    .field("behavior", DataTypes.STRING())
                    .field("ts", DataTypes.TIMESTAMP(3))
                    .rowtime(new Rowtime().timestampsFromField("ts").watermarksPeriodicAscending())
            )
            // 临时表的表名,后续可以在SQL语句中使用这个表名
            .createTemporaryTable("user_behavior");

        Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" +
                "\tuser_id, \n" +
                "\tCOUNT(behavior) AS behavior_cnt, \n" +
                "\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" +
                "FROM user_behavior\n" +
                "GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)");
        DataStream<Tuple2<Boolean, Row>> result = tEnv.toRetractStream(tumbleGroupByUserId, Row.class);
        result.print();

        env.execute("table api");
    }

As shown in the code above, I use rowtime() method when I want to define a Schema. When I try to run, I get the following error: Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered.

​I tried another method based on a DLL, and it worked. So it is not my Kafka source problem.

tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
                "    user_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    category_id BIGINT,\n" +
                "    behavior STRING,\n" +
                "    ts TIMESTAMP(3),\n" +
//                "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
                "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  -- 在ts上定义watermark,ts成为事件时间列\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 使用 kafka connector\n" +
                "    'connector.version' = 'universal',  -- kafka 版本,universal 支持 0.11 以上的版本\n" +
                "    'connector.topic' = 'user_behavior',  -- kafka topic\n" +
                "    'connector.startup-mode' = 'latest-offset',  -- 从起始 offset 开始读取\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zookeeper 地址\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker 地址\n" +
                "    'format.type' = 'json'  -- 数据源格式为 json\n" +
                ")");

Hope anyone can give some suggestions. Thanks.

Re: Table API connect method timestamp watermark assignment problem

Posted by Jark Wu <im...@gmail.com>.
Hi Lu,

DDL and Schema descriptor do not share the same code path. I guess the
reason why Schema descriptor doesn't work is because of FLINK-16160.
We will fix that in the next minor release. Please use DDL to define
watermark which is also the suggested way to do that.
The current Schema descriptor will be refactored to share the same code
path of DDL in the near future.

Best,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-16160

On Tue, 3 Mar 2020 at 10:09, Lu Weizheng <lu...@hotmail.com> wrote:

> Hey guys,
>
> I am using Flink Table API recently. I want to use EventTime and use a
> Kakfa Table Connector. I think in my code Flink cannot recognize event time
> timestamp field. Here is my code :
>
> public static void main(String[] args) throws Exception {
>
>         EnvironmentSettings fsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> fsSettings);
>
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>         tEnv
>             // 使用connect函数连接外部系统
>             .connect(
>                 new Kafka()
>                 .version("universal")     // 必填,合法的参数有"0.8", "0.9",
> "0.10", "0.11"或"universal"
>                 .topic("user_behavior")   // 必填,Topic名
>                 .startFromLatest()        // 首次消费时数据读取的位置
>                 .property("zookeeper.connect", "localhost:2181")  //
> Kafka连接参数
>                 .property("bootstrap.servers", "localhost:9092")
>             )
>             // 序列化方式 可以是JSON、Avro等
>             .withFormat(new Json())
>             // 数据的Schema
>             .withSchema(
>                 new Schema()
>                     .field("user_id", DataTypes.BIGINT())
>                     .field("item_id", DataTypes.BIGINT())
>                     .field("category_id", DataTypes.BIGINT())
>                     .field("behavior", DataTypes.STRING())
>                     .field("ts", DataTypes.TIMESTAMP(3))
>                     .rowtime(new
> Rowtime().timestampsFromField("ts").watermarksPeriodicAscending())
>             )
>             // 临时表的表名,后续可以在SQL语句中使用这个表名
>             .createTemporaryTable("user_behavior");
>
>         Table tumbleGroupByUserId = tEnv.sqlQuery("SELECT \n" +
>                 "\tuser_id, \n" +
>                 "\tCOUNT(behavior) AS behavior_cnt, \n" +
>                 "\tTUMBLE_END(ts, INTERVAL '10' SECOND) AS end_ts \n" +
>                 "FROM user_behavior\n" +
>                 "GROUP BY user_id, TUMBLE(ts, INTERVAL '10' SECOND)");
>         DataStream<Tuple2<Boolean, Row>> result =
> tEnv.toRetractStream(tumbleGroupByUserId, Row.class);
>         result.print();
>
>         env.execute("table api");
>     }
>
> As shown in the code above, I use rowtime() method when I want to define a
> Schema. When I try to run, I get the following error: Window aggregate
> can only be defined over a time attribute column, but TIMESTAMP(3)
> encountered.
>
> ​I tried another method based on a DLL, and it worked. So it is not my
> Kafka source problem.
>
> tEnv.sqlUpdate("CREATE TABLE user_behavior (\n" +
>                 "    user_id BIGINT,\n" +
>                 "    item_id BIGINT,\n" +
>                 "    category_id BIGINT,\n" +
>                 "    behavior STRING,\n" +
>                 "    ts TIMESTAMP(3),\n" +
> //                "    proctime as PROCTIME(),   -- 通过计算列产生一个处理时间列\n" +
>                 "    WATERMARK FOR ts as ts - INTERVAL '5' SECOND  --
> 在ts上定义watermark,ts成为事件时间列\n" +
>                 ") WITH (\n" +
>                 "    'connector.type' = 'kafka',  -- 使用 kafka connector\n"
> +
>                 "    'connector.version' = 'universal',  -- kafka
> 版本,universal 支持 0.11 以上的版本\n" +
>                 "    'connector.topic' = 'user_behavior',  -- kafka
> topic\n" +
>                 "    'connector.startup-mode' = 'latest-offset',  -- 从起始
> offset 开始读取\n" +
>                 "    'connector.properties.zookeeper.connect' =
> 'localhost:2181',  -- zookeeper 地址\n" +
>                 "    'connector.properties.bootstrap.servers' =
> 'localhost:9092',  -- kafka broker 地址\n" +
>                 "    'format.type' = 'json'  -- 数据源格式为 json\n" +
>                 ")");
>
> Hope anyone can give some suggestions. Thanks.
>