You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 花乞丐 <hu...@163.com> on 2021/01/27 10:32:14 UTC

flink sql 处理自定义watermark

我在使用Flink消费kafka中的消息,并对kafka中的每条消息增加水印,然后将kafka转换成Row,写入Hive,但是在提交PartitionTimeCommit的时候水印一直是长整形的最大负数,导致一直没办法提交分区,在hive中无法查询到数据。但是在hdfs上是有文件的,目前不太清楚是什么情况导致!

FlinkKafkaConsumerBase<FlatMessage> waterMessages =
messages.assignTimestampsAndWatermarks(
               
WatermarkStrategy.<FlatMessage>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new
SerializableTimestampAssigner<FlatMessage>() {
                            @Override
                            public long extractTimestamp(FlatMessage
element, long recordTimestamp) {
                                Long es = element.getEs();
                                return es;
                            }
                        })
        );

INSERT INTO ods.trade_real_delivery_incr\n" +
                "SELECT\n" +
                "    id,\n" +
                "    original_id,\n" +
                "    order_id,\n" +
                "    order_code,\n" +
                "    business_type,\n" +
                "    delivery_id,\n" +
                "    num,\n" +
                "    weight,\n" +
                "    creator_id,\n" +
                "    creator,\n" +
                "    admin_id,\n" +
                "    admin_name,\n" +
                "    admin_depart_id,\n" +
                "    admin_depart_code,\n" +
                "    admin_depart_name,\n" +
                "    create_time,\n" +
                "    update_time,\n" +
                "    es,\n" +
                "    ts,\n" +
                "    op,\n" +
                "    dt\n" +
                "FROM trade_real_delivery_tmp




--
Sent from: http://apache-flink.147419.n8.nabble.com/