You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Vadim Vararu (Jira)" <ji...@apache.org> on 2024/04/19 04:49:00 UTC
[jira] (FLINK-35115) Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
[ https://issues.apache.org/jira/browse/FLINK-35115 ]
Vadim Vararu deleted comment on FLINK-35115:
--------------------------------------
was (Author: JIRAUSER305101):
[~a.pilipenko] 4.3.0 will be released for Flink 1.18 as well, right?
> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -----------------------------------------------------------------------------
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kinesis
> Affects Versions: 1.15.4, aws-connector-4.0.0, aws-connector-4.1.0, aws-connector-4.2.0, 1.16.3, 1.17.2, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka* exactly-once setup with:
> * Flink versions checked 1.16.3 and 1.18.1
> * Kinesis connector checked 1.16.3 and 4.2.0-1.18
> * checkpointing configured at 1 minute with EXACTLY_ONCE mode:
> {code:java}
> StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.getExecutionEnvironment (); execEnv.enableCheckpointing (60000,EXACTLY_ONCE); execEnv.getCheckpointConfig ().setCheckpointTimeout (90000); execEnv.getCheckpointConfig ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
> * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 480000);
> KafkaSink<String> sink = KafkaSink.<String>builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema<String>) (element, context, timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
> * Kinesis consumer defined as:
> {code:java}
> FlinkKinesisConsumer<ByteBuffer> flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>
> Reporter: Vadim Vararu
> Assignee: Aleksandr Pilipenko
> Priority: Blocker
> Labels: kinesis, pull-request-available
> Fix For: aws-connector-4.3.0
>
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a stop-with-savepoint, Flink duplicates in Kafka all the records between the last checkpoint and the savepoint at resume:
> * Event1 is written to Kinesis
> * Event1 is processed by Flink
> * Event1 is committed to Kafka at the checkpoint
> * ............................................................................
> * Event2 is written to Kinesis
> * Event2 is processed by Flink
> * Stop with savepoint is triggered manually
> * Event2 is committed to Kafka
> * ............................................................................
> * Job is resumed from the savepoint
> * *{color:#FF0000}Event2 is written again to Kafka at the first checkpoint{color}*
>
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2 reasons:{color}
> * I've checked the actual Kinesis sequence number in the _metadata file generated at stop-with-savepoint and it's the one from the checkpoint before the savepoint instead of being the one of the last record committed to Kafka.
> * I've tested exactly the save job with Kafka as source instead of Kinesis as source and the behaviour does not reproduce.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)