You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by wei_yuze <we...@qq.com.INVALID> on 2023/02/24 06:50:17 UTC
在计算Window Top-N时,Flink SQL 时间语义不生效
您好!
我在运行Flink程序时遇到了一个问题,特来向各位大佬请教。
程序目标:
用Flink SQL求窗口 Top-5,开一小时的窗口。数据源为Kafka,我分批向Kafka里传入数据。计算出的 Top-5结果,写入MySQL。
问题:
一小时窗口设置完全没生效,事件时间和处理时间两种时间语义都测试了。我每向Kafka里传入一批数据,MySQL都会看到五条新增的 Top-5数据,可两批源数据之间的时间间隔并没有到一小时。
问题代码初步定位:
TUMBLE( TABLE watermarkedTable, DESCRIPTOR(ts), INTERVAL '1' HOUR )
完整源代码:
final StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
// Create table environment
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment);
// 接入Kafka数据源
KafkaSource<String> kafkaSource = KafkaSource
.<String>builder()
.setBootstrapServers(Config.KAFKA_BROKERS)
.setTopics(Config.KAFKA_TOPIC_EVENT)
.setGroupId("flink-consumer")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> stringStream = streamExecutionEnvironment
.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
"Kafka string source without watermark"
);
// Deserialize string stream
SingleOutputStreamOperator<Event> deserializedStream = stringStream
.map(
new MapFunction<String, Event>() {
public Event map(String jsonString) throws Exception {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setDateFormat(simpleDateFormat);
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
Event deserializedObject = objectMapper.readValue(jsonString, Event.class);
return deserializedObject;
}
});
SingleOutputStreamOperator<Event> watermarkedStream = deserializedStream.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(0L))
// .withTimestampAssigner((event, l) -> event.getTs().getTime())
.withTimestampAssigner(
new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long kafkaTimestamp) {
Timestamp timestamp = event.getOccurrentTime();
long time = timestamp.getTime();
return time;
}
}
)
);
// 将流数据转换成表
Table watermarkedTable = streamTableEnvironment.fromDataStream(
watermarkedStream,
$("uid"),
$("eventId"),
// $("eventName"),
$("serviceId"),
// $("serviceName"),
$("occurrentTime").rowtime().as("ts")
);
// 注册table用于SQL API
streamTableEnvironment.createTemporaryView("watermarkedTable", watermarkedTable);
// watermarkedTable.printSchema();
String countQuery =
"SELECT eventId, " +
// "eventName, " +
"serviceId, " +
// "serviceName, " +
"COUNT(*) as eventCount, " +
"COUNT(DISTINCT uid) as userCount " +
"FROM TABLE ( " +
"TUMBLE( TABLE watermarkedTable, DESCRIPTOR(ts), INTERVAL '1' HOUR ) " +
") " +
"GROUP BY eventId, serviceId ";
String sortEventQuery =
"SELECT *, " +
"ROW_NUMBER() OVER ( " +
"ORDER BY eventCount desc " +
") AS eventRank " +
"FROM (" + countQuery + ") ";
String top5Query =
"SELECT * " +
"FROM (" + sortEventQuery + ") " +
"WHERE eventRank <= 5 ";
// 执行SQL得到结果表
Table queryResultTable = streamTableEnvironment.sqlQuery(top5Query);
// Create sink table
String top5EventTableDDL =
"CREATE TABLE top5EventTable ( ......因为涉及账号密码,暂且省略......)";
streamTableEnvironment.executeSql(top5EventTableDDL);
// 持久化
TableResult sinkResult = queryResultTable.executeInsert("top5EventTable");
感谢您花时间查看这个问题!
Lucas