You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Sagar Rao (Jira)" <ji...@apache.org> on 2023/01/17 18:11:00 UTC

[jira] [Commented] (KAFKA-14625) CheckpointFile read and write API consistency

    [ https://issues.apache.org/jira/browse/KAFKA-14625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17677875#comment-17677875 ] 

Sagar Rao commented on KAFKA-14625:
-----------------------------------

hey [~satish.duggana] , i am assuming you plan to work on this? Nonetheless, I took a look at the 2 APIs and the places from where it's been called. The main difference that I see is that `CommittedOffsetsFile` functions with a map while the other 2 scala classes `OffsetCheckpointFile` and `LeaderEpochCheckpointFile` interoperate with java lists and scala sequences. So, the main difference is in `CommittedOffsetsFile` which operates with entrySet of the `partitionToConsumedOffsets`.  I am thinking if we can maintain another list of TopicPartitionOffsets which gets stores the topic/partitions and the map `partitionToConsumedOffsets` stores the TopicPartitionOffsets object keyed by partition (same key as today). 

 

We can keep updating the list as and when the entries are added/removed in the map and when we want to sync, we can pass the List as is. A very crude idea in this java program:

 
{code:java}
public class TestEquality {

    static class TopicPartitionOffsets {
        Integer tp;
        Long offset;

        public TopicPartitionOffsets(Integer tp, Long offset) {
            this.tp = tp;
            this.offset = offset;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            TopicPartitionOffsets that = (TopicPartitionOffsets) o;
            return Objects.equals(tp, that.tp) && Objects.equals(offset, that.offset);
        }

        @Override
        public int hashCode() {
            return Objects.hash(tp, offset);
        }

        @Override
        public String toString() {
            return "TopicPartitionOffsets{" +
                    "tp=" + tp +
                    ", offset=" + offset +
                    '}';
        }
    }

    public static void main(String[] args) {
        Map<Integer, TopicPartitionOffsets> partitionToConsumedOffsets = new ConcurrentHashMap<>();
        LinkedList<TopicPartitionOffsets> topicPartitionOffsets = new LinkedList<>();

        TopicPartitionOffsets tp1 = new TopicPartitionOffsets(1, 100L);
        partitionToConsumedOffsets.put(1, tp1);

        topicPartitionOffsets.add(tp1);

        System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets);

        tp1.offset = 200L;

        System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets);

        TopicPartitionOffsets tp2 = partitionToConsumedOffsets.get(1);
        tp2.offset = 300L;

        System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets);

        topicPartitionOffsets.remove(tp2);
        partitionToConsumedOffsets.remove(1);

        System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets);

    }

} {code}
I am using a LinkedList here so that removing from the List becomes easier(adds extra time complexity though). And also, the 2 updates on the data structures should happen in an atomic fashion i.e if one fails the other one doesn't fail. 

> CheckpointFile read and write API consistency 
> ----------------------------------------------
>
>                 Key: KAFKA-14625
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14625
>             Project: Kafka
>          Issue Type: Improvement
>          Components: core
>            Reporter: Satish Duggana
>            Priority: Major
>
> ` CheckpointFile` has the below read and write APIs, write expects a Collection of items, but read returns a List of elements. It is better to look into these APIs and its usages and see whether consistency can be brought without introducing any extra collection conversions. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)