You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by 花乞丐 <hu...@163.com> on 2021/01/27 10:32:14 UTC
flink sql 处理自定义watermark
我在使用Flink消费kafka中的消息,并对kafka中的每条消息增加水印,然后将kafka转换成Row,写入Hive,但是在提交PartitionTimeCommit的时候水印一直是长整形的最大负数,导致一直没办法提交分区,在hive中无法查询到数据。但是在hdfs上是有文件的,目前不太清楚是什么情况导致!
FlinkKafkaConsumerBase<FlatMessage> waterMessages =
messages.assignTimestampsAndWatermarks(
WatermarkStrategy.<FlatMessage>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner(new
SerializableTimestampAssigner<FlatMessage>() {
@Override
public long extractTimestamp(FlatMessage
element, long recordTimestamp) {
Long es = element.getEs();
return es;
}
})
);
INSERT INTO ods.trade_real_delivery_incr\n" +
"SELECT\n" +
" id,\n" +
" original_id,\n" +
" order_id,\n" +
" order_code,\n" +
" business_type,\n" +
" delivery_id,\n" +
" num,\n" +
" weight,\n" +
" creator_id,\n" +
" creator,\n" +
" admin_id,\n" +
" admin_name,\n" +
" admin_depart_id,\n" +
" admin_depart_code,\n" +
" admin_depart_name,\n" +
" create_time,\n" +
" update_time,\n" +
" es,\n" +
" ts,\n" +
" op,\n" +
" dt\n" +
"FROM trade_real_delivery_tmp
--
Sent from: http://apache-flink.147419.n8.nabble.com/