You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by santosh joshi <sa...@gmail.com> on 2022/02/12 15:39:15 UTC
Need help on implementing custom`snapshotState` in KafkaSource & KafkaSourceReader
We are migrating to KafkaSource
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java>
from FlinkKafkaConsumer
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java>.
We have disabled auto commit of offset and instead committing them manually
to some external store
We override FlinkKafkaConsumer
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java>
and then on an overridden instance of KafkaFetcher
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java>
we try to store the *offset* in some external store by overriding
doCommitInternalOffsetsToKafka
protected void
doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws Exception {
//Store offset in S3
}
Now In order to migrate we tried coping/overriding KafkaSource,
KafkaSourceBuilder and KafkaSourceReade, but looks like a lot of redundant
code which somehow does not look correct
In Custom KafkaSourceReader I tried overriding snapshotState
@Override
public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
// custom logic to store offset in s3
return super.snapshotState(checkpointId);
}
Is this correct or Is there any other way to achieve the same.
I have asked the similar questions in Stackoverflow
<https://stackoverflow.com/questions/71092656/how-to-implement-customsnapshotstate-in-kafkasource-kafkasourcereader>
Regards,
Santosh
Re: Need help on implementing custom`snapshotState` in KafkaSource & KafkaSourceReader
Posted by Niklas Semmler <ni...@ververica.com>.
Hi Santosh,
It’s best to avoid cross-posting. Let’s keep the discussion to SO.
Best regards,
Niklas
> On 12. Feb 2022, at 16:39, santosh joshi <sa...@gmail.com> wrote:
>
> We are migrating to KafkaSource from FlinkKafkaConsumer. We have disabled auto commit of offset and instead committing them manually to some external store
>
> We override FlinkKafkaConsumer and then on an overridden instance of KafkaFetcher we try to store the offset in some external store by overriding doCommitInternalOffsetsToKafka
>
> protected void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets,
> @Nonnull KafkaCommitCallback commitCallback) throws Exception {
> //Store offset in S3
> }
>
> Now In order to migrate we tried coping/overriding KafkaSource, KafkaSourceBuilder and KafkaSourceReade, but looks like a lot of redundant code which somehow does not look correct
>
> In Custom KafkaSourceReader I tried overriding snapshotState
>
> @Override
> public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
> // custom logic to store offset in s3
> return super.snapshotState(checkpointId);
> }
>
> Is this correct or Is there any other way to achieve the same.
>
> I have asked the similar questions in Stackoverflow
>
>
> Regards,
> Santosh