You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by santosh joshi <sa...@gmail.com> on 2022/02/12 15:39:15 UTC

Need help on implementing custom`snapshotState` in KafkaSource & KafkaSourceReader

We are migrating to KafkaSource
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java>
 from FlinkKafkaConsumer
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java>.
We have disabled auto commit of offset and instead committing them manually
to some external store

We override FlinkKafkaConsumer
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer.java>
 and then on an overridden instance of KafkaFetcher
<https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaFetcher.java>
 we try to store the *offset* in some external store by overriding
doCommitInternalOffsetsToKafka

 protected void
doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets,
        @Nonnull KafkaCommitCallback commitCallback) throws Exception {
//Store offset in S3
}

Now In order to migrate we tried coping/overriding KafkaSource,
KafkaSourceBuilder and KafkaSourceReade, but looks like a lot of redundant
code which somehow does not look correct

In Custom KafkaSourceReader I tried overriding snapshotState

@Override
public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
   // custom logic to store offset in s3
   return super.snapshotState(checkpointId);
}

Is this correct or Is there any other way to achieve the same.


I have asked the similar questions in Stackoverflow
<https://stackoverflow.com/questions/71092656/how-to-implement-customsnapshotstate-in-kafkasource-kafkasourcereader>



Regards,

Santosh

Re: Need help on implementing custom`snapshotState` in KafkaSource & KafkaSourceReader

Posted by Niklas Semmler <ni...@ververica.com>.
Hi Santosh,

It’s best to avoid cross-posting. Let’s keep the discussion to SO.

Best regards,
Niklas

> On 12. Feb 2022, at 16:39, santosh joshi <sa...@gmail.com> wrote:
> 
> We are migrating to KafkaSource from FlinkKafkaConsumer. We have disabled auto commit of offset and instead committing them manually to some external store
> 
> We override FlinkKafkaConsumer and then on an overridden instance of KafkaFetcher we try to store the offset in some external store by overriding doCommitInternalOffsetsToKafka
> 
>  protected void doCommitInternalOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets,
>         @Nonnull KafkaCommitCallback commitCallback) throws Exception {
> //Store offset in S3
> }
> 
> Now In order to migrate we tried coping/overriding KafkaSource, KafkaSourceBuilder and KafkaSourceReade, but looks like a lot of redundant code which somehow does not look correct
> 
> In Custom KafkaSourceReader I tried overriding snapshotState
> 
> @Override
> public List<KafkaPartitionSplit> snapshotState(long checkpointId) {
>    // custom logic to store offset in s3
>    return super.snapshotState(checkpointId);
> }
> 
> Is this correct or Is there any other way to achieve the same.
> 
> I have asked the similar questions in Stackoverflow
> 
> 
> Regards,
> Santosh