You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by HG <ha...@gmail.com> on 2022/02/21 15:06:14 UTC
watermark/timestamp between kafka datastream - table - datastream
Hi,
In the code below I read from Kafka and convert the datastream from
ObjectNode to Tuple4.
When the stream is of Tuple4 I assign a timestamp assigner to a specific
field.
When the stream is converted to a table I set SOURCE_WATERMARK() to a
specific column
This in order to user event time instead of processing time.
But somehow the watermarks and timestamp do not work as I intend.
After I convert the query result back to a datastream and add about 30000
events to the kafka queue, only 2 identical records are output:
==> flink-root-taskexecutor-0-nlvora196.out <==
+I[0000017e4ff4fd21-944511, 2022-02-11T12:03:04.105Z, 1644580984105, 0,
null]
+I[0000017e4ff4fd21-944511, 2022-02-11T12:03:04.105Z, 1644580984105, 0,
null]
De timestamps in the datastream are all from Feb 11th.
The GUI shows timestamps from today (Feb 21st).
Am I missing a part? Doing something wrong?
Please help
[image: watermark-step2.png]
[image: watermark-step3.png]
KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
.setProperties(kafkaProps)
.setProperty("ssl.truststore.type",trustStoreType)
.setProperty("ssl.truststore.password",trustStorePassword)
.setProperty("ssl.truststore.location",trustStoreLocation)
.setProperty("security.protocol",securityProtocol)
.setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
.setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint)
.setGroupId(groupId)
.setTopics(kafkaInputTopic)
.setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
//.setStartingOffsets(OffsetsInitializer.earliest())
//.setBounded(OffsetsInitializer.latest())
.build();
DataStream<ObjectNode> ds = env.fromSource(source,
WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
ds.flatMap(new Splitter());
WatermarkStrategy<Tuple4<Long, Long, String, String>> wmstrategy =
WatermarkStrategy
.<Tuple4<Long, Long, String,
String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))
.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
.withTimestampAssigner(new
SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
@Override
public long extractTimestamp(Tuple4<Long, Long, String,
String> element, long eventTime) {
return element.f0;
}
});
DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
tuple4ds.assignTimestampsAndWatermarks(wmstrategy);
Table tupled4DsTable = tableEnv.fromDataStream(tuple4ds,
Schema.newBuilder()
.columnByExpression(
"f0_ts",
Expressions.callSql("TO_TIMESTAMP_LTZ(f0, 3)"))
.column("f1","BIGINT")
.column("f2","STRING")
.column("f3","STRING")
.watermark("f0_ts", "SOURCE_WATERMARK()")
.build()).as("eventTime", "handlingTime", "transactionId",
"originalEvent");
tupled4DsTable.printSchema();
Table result = tableEnv.sqlQuery("select transactionId" +
", originalEvent" +
", eventTime" +
", handlingTime" +
", handlingTime - ifnull(lag(handlingTime) over (partition
by transactionId order by eventTime), handlingTime) as elapsedTime" +
", lag(handlingTime) over (partition by transactionId order
by eventTime) prevHandlingTime" +
" from " + tupled4DsTable + " order by eventTime");
result.printSchema();
DataStream<Row> xx = tableEnv.toDataStream(result);
xx.print();