You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by HG <ha...@gmail.com> on 2022/02/21 15:06:14 UTC

watermark/timestamp between kafka datastream - table - datastream

Hi,

In the code below I read from Kafka and convert the datastream from
ObjectNode to Tuple4.
When the stream is of Tuple4 I assign a timestamp assigner to a specific
field.
When the stream is converted to a table I set SOURCE_WATERMARK() to a
specific column
This in order to user event time instead of processing time.

But somehow the watermarks and timestamp do not work as I intend.

After I convert the query result back to a datastream and add about 30000
events to the kafka queue, only 2 identical records are output:
==> flink-root-taskexecutor-0-nlvora196.out <==
+I[0000017e4ff4fd21-944511, 2022-02-11T12:03:04.105Z, 1644580984105, 0,
null]
+I[0000017e4ff4fd21-944511, 2022-02-11T12:03:04.105Z, 1644580984105, 0,
null]

De timestamps in the datastream are all from Feb 11th.
The GUI shows timestamps from today (Feb 21st).

Am I missing a part? Doing something wrong?

Please help


[image: watermark-step2.png]
[image: watermark-step3.png]

        KafkaSource<ObjectNode> source = KafkaSource.<ObjectNode>builder()
                .setProperties(kafkaProps)
                .setProperty("ssl.truststore.type",trustStoreType)
                .setProperty("ssl.truststore.password",trustStorePassword)
                .setProperty("ssl.truststore.location",trustStoreLocation)
                .setProperty("security.protocol",securityProtocol)
                .setProperty("partition.discovery.interval.ms",
partitionDiscoveryIntervalMs)
                .setProperty("commit.offsets.on.checkpoint",
commitOffsetsOnCheckpoint)
                .setGroupId(groupId)
                .setTopics(kafkaInputTopic)
                .setDeserializer(KafkaRecordDeserializationSchema.of(new
JSONKeyValueDeserializationSchema(fetchMetadata)))

.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
                //.setStartingOffsets(OffsetsInitializer.earliest())
                //.setBounded(OffsetsInitializer.latest())
              .build();

        DataStream<ObjectNode> ds = env.fromSource(source,
WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
        DataStream<Tuple4<Long, Long, String, String>> tuple4ds =
ds.flatMap(new Splitter());


        WatermarkStrategy<Tuple4<Long, Long, String, String>> wmstrategy =
WatermarkStrategy
                .<Tuple4<Long, Long, String,
String>>forBoundedOutOfOrderness(Duration.ofSeconds(Integer.parseInt(outOfOrderness)))

.withIdleness(Duration.ofSeconds(Integer.parseInt(idleness)))
                .withTimestampAssigner(new
SerializableTimestampAssigner<Tuple4<Long, Long, String, String>>() {
                    @Override
                    public long extractTimestamp(Tuple4<Long, Long, String,
String> element, long eventTime) {
                        return element.f0;
                    }
                });

        DataStream<Tuple4<Long, Long, String, String>> tuple4dswm =
tuple4ds.assignTimestampsAndWatermarks(wmstrategy);

        Table tupled4DsTable = tableEnv.fromDataStream(tuple4ds,
Schema.newBuilder()
                .columnByExpression(
                        "f0_ts",
                        Expressions.callSql("TO_TIMESTAMP_LTZ(f0, 3)"))
                .column("f1","BIGINT")
                .column("f2","STRING")
                .column("f3","STRING")
                .watermark("f0_ts", "SOURCE_WATERMARK()")
                .build()).as("eventTime", "handlingTime", "transactionId",
"originalEvent");

          tupled4DsTable.printSchema();


        Table result = tableEnv.sqlQuery("select transactionId" +
                ", originalEvent" +
                ", eventTime" +
                ", handlingTime" +
                ", handlingTime - ifnull(lag(handlingTime) over (partition
by transactionId order by eventTime), handlingTime) as elapsedTime" +
                ", lag(handlingTime) over (partition by transactionId order
by eventTime) prevHandlingTime" +
                " from " + tupled4DsTable + " order by eventTime");

        result.printSchema();

        DataStream<Row> xx = tableEnv.toDataStream(result);
        xx.print();