You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lars Skjærven <la...@tv2.no> on 2021/04/28 08:36:33 UTC

KafkaSourceBuilder causing invalid negative offset on checkpointing

Hello,
I ran into some issues when using the new KafkaSourceBuilder (running Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java 8). Initially it generated warnings on kafka configuration, but the job was able to consume and produce messages.


The configuration 'client.id.prefix' was supplied but isn't a known config.
The configuration 'partition.discovery.interval.ms' was supplied but isn't a known config.


Finally the job crashed with a checkpointing error:


java.lang.Exception: Could not perform checkpoint 10 for operator Source: progress source (4/6)#9.
....
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 10 for operator Source: progress source (4/6)#9. Failure reason: Checkpoint was declined.
...
Caused by: java.lang.IllegalArgumentException: Invalid negative offset



Switching back to using FlinkKafkaConsumer, the warnings on the kafka config disapeared, and the job was able to checkpoint successfully.

I'm wondering if the warnings and the errors are connected, and if there is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5 ?

Thanks,
L


Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for the stack traces Lars. With them I could confirm that the
problem should be fixed with FLINK-20114 [1]. The fixes will be contained
in the 1.12.4 and 1.13.0 release. Sorry for the inconveniences.

[1] https://issues.apache.org/jira/browse/FLINK-20114

Cheers,
Till

On Thu, Apr 29, 2021 at 8:30 PM Lars Skjærven <la...@tv2.no> wrote:

> Unfortunately, I only have the truncated stack trace available (from the
> flink UI).
> L
>
>
> 2021-04-27 16:32:02
> java.lang.Exception: Could not perform checkpoint 10 for operator Source: progress source (4/6)#9.
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:924)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:885)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 10 for operator Source: progress source (4/6)#9. Failure reason: Checkpoint was declined.
> 	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
> 	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
> 	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
> 	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
> 	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
> 	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
> 	... 10 more
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
> 	at org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:50)
> 	at org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:69)
> 	at org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.snapshotState(KafkaSourceReader.java:96)
> 	at org.apache.flink.streaming.api.operators.SourceOperator.snapshotState(SourceOperator.java:288)
> 	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
> 	... 20 more
>
>
>
> ------------------------------
> *From:* Till Rohrmann <tr...@apache.org>
> *Sent:* Thursday, April 29, 2021 18:44
> *To:* Lars Skjærven <la...@tv2.no>
> *Cc:* Becket Qin <be...@gmail.com>; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: KafkaSourceBuilder causing invalid negative offset on
> checkpointing
>
> Thanks for the additional information Lars. Could you maybe also share the
> full stack traces of the errors you see when the checkpoint fails?
>
> @Becket Qin <be...@gmail.com> is it a known issue with the new Kafka
> sources trying to checkpoint negative offsets?
>
> Cheers,
> Till
>
> On Thu, Apr 29, 2021 at 1:06 PM Lars Skjærven <la...@tv2.no>
> wrote:
>
> Thanks Till.
>
> Here is how we created the KafkaSource:
>
>     val sensorSource = KafkaSource.builder[SensorInput]()
>       .setBootstrapServers(myConfig.kafkaBrokers)
>       .setGroupId(myConfig.kafkaGroupId)
>       .setTopics(myConfig.kafkaTopicIn)
>       .setDeserializer(new SensorInputPBDeserializationSchema)
>       .setStartingOffsets(OffsetsInitializer.earliest())
>       .build()
>
> The stream was built with
>
>     env.fromSource(sensorSource , WatermarkStrategy.
> forMonotonousTimestamps(), "sensor events")
>
> The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer
> that does SensorInputPB.parseFrom(record.value()) and finally
> collector.collect(v)
>
> From here on we're doing a keyed windowed aggregation with .keyBy(...).
> window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregate(new
> SensorEventAggregator)
>
> L
>
>
> ------------------------------
> *From:* Till Rohrmann <tr...@apache.org>
> *Sent:* Thursday, April 29, 2021 09:16
> *To:* Lars Skjærven <la...@tv2.no>; Becket Qin <
> becket.qin@gmail.com>
> *Cc:* user@flink.apache.org <us...@flink.apache.org>
> *Subject:* Re: KafkaSourceBuilder causing invalid negative offset on
> checkpointing
>
> Hi Lars,
>
> The KafkaSourceBuilder constructs the new KafkaSource which has not been
> fully hardened in 1.12.2. In fact, it should not be documented yet. I think
> you are running into an instability/bug of. The new Kafka source should be
> hardened a lot more in the 1.13.0 release.
>
> Could you tell us exactly how you created the KafkaSource so that we can
> verify that this problem has been properly fixed in the 1.13.0 release? I
> am also pulling in Becket who is the original author of this connector. He
> might be able to tell you more.
>
> Cheers,
> Till
>
> On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven <la...@tv2.no>
> wrote:
>
> Hello,
> I ran into some issues when using the new KafkaSourceBuilder (running
> Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java
> 8). Initially it generated warnings on kafka configuration, but the job was
> able to consume and produce messages.
>
> The configuration 'client.id.prefix' was supplied but isn't a known config.	
> The configuration 'partition.discovery.interval.ms' was supplied but isn't a known config.	
>
>
> Finally the job crashed with a checkpointing error:
>
> java.lang.Exception: Could not perform checkpoint 10 for operator Source: progress source (4/6)#9.
> ....
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 10 for operator Source: progress source (4/6)#9. Failure reason: Checkpoint was declined.
> ...
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>
>
>
> Switching back to using FlinkKafkaConsumer, the warnings on the kafka
> config disapeared, and the job was able to checkpoint successfully.
>
> I'm wondering if the warnings and the errors are connected, and if there
> is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5
> ?
>
> Thanks,
> L
>
>

Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

Posted by Lars Skjærven <la...@tv2.no>.
Unfortunately, I only have the truncated stack trace available (from the flink UI).
L



2021-04-27 16:32:02
java.lang.Exception: Could not perform checkpoint 10 for operator Source: progress source (4/6)#9.
        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:924)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$triggerCheckpointAsync$8(StreamTask.java:885)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:317)
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:189)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 10 for operator Source: progress source (4/6)#9. Failure reason: Checkpoint was declined.
        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:241)
        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:162)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:686)
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:607)
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:572)
        at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:298)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$9(StreamTask.java:1004)
        at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:988)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:912)
        ... 10 more
Caused by: java.lang.IllegalArgumentException: Invalid negative offset
        at org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:50)
        at org.apache.kafka.clients.consumer.OffsetAndMetadata.<init>(OffsetAndMetadata.java:69)
        at org.apache.flink.connector.kafka.source.reader.KafkaSourceReader.snapshotState(KafkaSourceReader.java:96)
        at org.apache.flink.streaming.api.operators.SourceOperator.snapshotState(SourceOperator.java:288)
        at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:205)
        ... 20 more


________________________________
From: Till Rohrmann <tr...@apache.org>
Sent: Thursday, April 29, 2021 18:44
To: Lars Skjærven <la...@tv2.no>
Cc: Becket Qin <be...@gmail.com>; user@flink.apache.org <us...@flink.apache.org>
Subject: Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

Thanks for the additional information Lars. Could you maybe also share the full stack traces of the errors you see when the checkpoint fails?

@Becket Qin<ma...@gmail.com> is it a known issue with the new Kafka sources trying to checkpoint negative offsets?

Cheers,
Till

On Thu, Apr 29, 2021 at 1:06 PM Lars Skjærven <la...@tv2.no>> wrote:
Thanks Till.

Here is how we created the KafkaSource:

    val sensorSource = KafkaSource.builder[SensorInput]()
      .setBootstrapServers(myConfig.kafkaBrokers)
      .setGroupId(myConfig.kafkaGroupId)
      .setTopics(myConfig.kafkaTopicIn)
      .setDeserializer(new SensorInputPBDeserializationSchema)
      .setStartingOffsets(OffsetsInitializer.earliest())
      .build()

The stream was built with

    env.fromSource(sensorSource , WatermarkStrategy.forMonotonousTimestamps(), "sensor events")

The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer that does SensorInputPB.parseFrom(record.value()) and finally collector.collect(v)

From here on we're doing a keyed windowed aggregation with .keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregate(new SensorEventAggregator)

L


________________________________
From: Till Rohrmann <tr...@apache.org>>
Sent: Thursday, April 29, 2021 09:16
To: Lars Skjærven <la...@tv2.no>>; Becket Qin <be...@gmail.com>>
Cc: user@flink.apache.org<ma...@flink.apache.org> <us...@flink.apache.org>>
Subject: Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

Hi Lars,

The KafkaSourceBuilder constructs the new KafkaSource which has not been fully hardened in 1.12.2. In fact, it should not be documented yet. I think you are running into an instability/bug of. The new Kafka source should be hardened a lot more in the 1.13.0 release.

Could you tell us exactly how you created the KafkaSource so that we can verify that this problem has been properly fixed in the 1.13.0 release? I am also pulling in Becket who is the original author of this connector. He might be able to tell you more.

Cheers,
Till

On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven <la...@tv2.no>> wrote:
Hello,
I ran into some issues when using the new KafkaSourceBuilder (running Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java 8). Initially it generated warnings on kafka configuration, but the job was able to consume and produce messages.


The configuration 'client.id.prefix' was supplied but isn't a known config.
The configuration 'partition.discovery.interval.ms<http://partition.discovery.interval.ms>' was supplied but isn't a known config.


Finally the job crashed with a checkpointing error:


java.lang.Exception: Could not perform checkpoint 10 for operator Source: progress source (4/6)#9.
....
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 10 for operator Source: progress source (4/6)#9. Failure reason: Checkpoint was declined.
...
Caused by: java.lang.IllegalArgumentException: Invalid negative offset



Switching back to using FlinkKafkaConsumer, the warnings on the kafka config disapeared, and the job was able to checkpoint successfully.

I'm wondering if the warnings and the errors are connected, and if there is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5 ?

Thanks,
L


Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

Posted by Till Rohrmann <tr...@apache.org>.
Thanks for the additional information Lars. Could you maybe also share the
full stack traces of the errors you see when the checkpoint fails?

@Becket Qin <be...@gmail.com> is it a known issue with the new Kafka
sources trying to checkpoint negative offsets?

Cheers,
Till

On Thu, Apr 29, 2021 at 1:06 PM Lars Skjærven <la...@tv2.no> wrote:

> Thanks Till.
>
> Here is how we created the KafkaSource:
>
>     val sensorSource = KafkaSource.builder[SensorInput]()
>       .setBootstrapServers(myConfig.kafkaBrokers)
>       .setGroupId(myConfig.kafkaGroupId)
>       .setTopics(myConfig.kafkaTopicIn)
>       .setDeserializer(new SensorInputPBDeserializationSchema)
>       .setStartingOffsets(OffsetsInitializer.earliest())
>       .build()
>
> The stream was built with
>
>     env.fromSource(sensorSource , WatermarkStrategy.
> forMonotonousTimestamps(), "sensor events")
>
> The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer
> that does SensorInputPB.parseFrom(record.value()) and finally
> collector.collect(v)
>
> From here on we're doing a keyed windowed aggregation with .keyBy(...).
> window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregate(new
> SensorEventAggregator)
>
> L
>
>
> ------------------------------
> *From:* Till Rohrmann <tr...@apache.org>
> *Sent:* Thursday, April 29, 2021 09:16
> *To:* Lars Skjærven <la...@tv2.no>; Becket Qin <
> becket.qin@gmail.com>
> *Cc:* user@flink.apache.org <us...@flink.apache.org>
> *Subject:* Re: KafkaSourceBuilder causing invalid negative offset on
> checkpointing
>
> Hi Lars,
>
> The KafkaSourceBuilder constructs the new KafkaSource which has not been
> fully hardened in 1.12.2. In fact, it should not be documented yet. I think
> you are running into an instability/bug of. The new Kafka source should be
> hardened a lot more in the 1.13.0 release.
>
> Could you tell us exactly how you created the KafkaSource so that we can
> verify that this problem has been properly fixed in the 1.13.0 release? I
> am also pulling in Becket who is the original author of this connector. He
> might be able to tell you more.
>
> Cheers,
> Till
>
> On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven <la...@tv2.no>
> wrote:
>
> Hello,
> I ran into some issues when using the new KafkaSourceBuilder (running
> Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java
> 8). Initially it generated warnings on kafka configuration, but the job was
> able to consume and produce messages.
>
> The configuration 'client.id.prefix' was supplied but isn't a known config.	
> The configuration 'partition.discovery.interval.ms' was supplied but isn't a known config.	
>
>
> Finally the job crashed with a checkpointing error:
>
> java.lang.Exception: Could not perform checkpoint 10 for operator Source: progress source (4/6)#9.
> ....
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 10 for operator Source: progress source (4/6)#9. Failure reason: Checkpoint was declined.
> ...
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>
>
>
> Switching back to using FlinkKafkaConsumer, the warnings on the kafka
> config disapeared, and the job was able to checkpoint successfully.
>
> I'm wondering if the warnings and the errors are connected, and if there
> is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5
> ?
>
> Thanks,
> L
>
>

Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

Posted by Lars Skjærven <la...@tv2.no>.
Thanks Till.

Here is how we created the KafkaSource:

    val sensorSource = KafkaSource.builder[SensorInput]()
      .setBootstrapServers(myConfig.kafkaBrokers)
      .setGroupId(myConfig.kafkaGroupId)
      .setTopics(myConfig.kafkaTopicIn)
      .setDeserializer(new SensorInputPBDeserializationSchema)
      .setStartingOffsets(OffsetsInitializer.earliest())
      .build()

The stream was built with

    env.fromSource(sensorSource , WatermarkStrategy.forMonotonousTimestamps(), "sensor events")

The SensorInputPBDeserializationSchema is a basic KafkaRecordDeserializer that does SensorInputPB.parseFrom(record.value()) and finally collector.collect(v)

From here on we're doing a keyed windowed aggregation with .keyBy(...).window(EventTimeSessionWindows.withGap(Time.seconds(60))).aggregate(new SensorEventAggregator)

L


________________________________
From: Till Rohrmann <tr...@apache.org>
Sent: Thursday, April 29, 2021 09:16
To: Lars Skjærven <la...@tv2.no>; Becket Qin <be...@gmail.com>
Cc: user@flink.apache.org <us...@flink.apache.org>
Subject: Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

Hi Lars,

The KafkaSourceBuilder constructs the new KafkaSource which has not been fully hardened in 1.12.2. In fact, it should not be documented yet. I think you are running into an instability/bug of. The new Kafka source should be hardened a lot more in the 1.13.0 release.

Could you tell us exactly how you created the KafkaSource so that we can verify that this problem has been properly fixed in the 1.13.0 release? I am also pulling in Becket who is the original author of this connector. He might be able to tell you more.

Cheers,
Till

On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven <la...@tv2.no>> wrote:
Hello,
I ran into some issues when using the new KafkaSourceBuilder (running Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java 8). Initially it generated warnings on kafka configuration, but the job was able to consume and produce messages.


The configuration 'client.id.prefix' was supplied but isn't a known config.
The configuration 'partition.discovery.interval.ms<http://partition.discovery.interval.ms>' was supplied but isn't a known config.


Finally the job crashed with a checkpointing error:


java.lang.Exception: Could not perform checkpoint 10 for operator Source: progress source (4/6)#9.
....
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 10 for operator Source: progress source (4/6)#9. Failure reason: Checkpoint was declined.
...
Caused by: java.lang.IllegalArgumentException: Invalid negative offset



Switching back to using FlinkKafkaConsumer, the warnings on the kafka config disapeared, and the job was able to checkpoint successfully.

I'm wondering if the warnings and the errors are connected, and if there is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5 ?

Thanks,
L


Re: KafkaSourceBuilder causing invalid negative offset on checkpointing

Posted by Till Rohrmann <tr...@apache.org>.
Hi Lars,

The KafkaSourceBuilder constructs the new KafkaSource which has not been
fully hardened in 1.12.2. In fact, it should not be documented yet. I think
you are running into an instability/bug of. The new Kafka source should be
hardened a lot more in the 1.13.0 release.

Could you tell us exactly how you created the KafkaSource so that we can
verify that this problem has been properly fixed in the 1.13.0 release? I
am also pulling in Becket who is the original author of this connector. He
might be able to tell you more.

Cheers,
Till

On Wed, Apr 28, 2021 at 10:36 AM Lars Skjærven <la...@tv2.no> wrote:

> Hello,
> I ran into some issues when using the new KafkaSourceBuilder (running
> Flink 1.12.2, scala 2.12.13, on ververica platform in a container with java
> 8). Initially it generated warnings on kafka configuration, but the job was
> able to consume and produce messages.
>
> The configuration 'client.id.prefix' was supplied but isn't a known config.	
> The configuration 'partition.discovery.interval.ms' was supplied but isn't a known config.	
>
>
> Finally the job crashed with a checkpointing error:
>
> java.lang.Exception: Could not perform checkpoint 10 for operator Source: progress source (4/6)#9.
> ....
> Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 10 for operator Source: progress source (4/6)#9. Failure reason: Checkpoint was declined.
> ...
> Caused by: java.lang.IllegalArgumentException: Invalid negative offset
>
>
>
> Switching back to using FlinkKafkaConsumer, the warnings on the kafka
> config disapeared, and the job was able to checkpoint successfully.
>
> I'm wondering if the warnings and the errors are connected, and if there
> is a compatibility issue between the newer KafkaSourceBuilder and Kafka 2.5
> ?
>
> Thanks,
> L
>
>