You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Sagar Rao (Jira)" <ji...@apache.org> on 2023/01/17 18:11:00 UTC
[jira] [Commented] (KAFKA-14625) CheckpointFile read and write API consistency
[ https://issues.apache.org/jira/browse/KAFKA-14625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17677875#comment-17677875 ]
Sagar Rao commented on KAFKA-14625:
-----------------------------------
hey [~satish.duggana] , i am assuming you plan to work on this? Nonetheless, I took a look at the 2 APIs and the places from where it's been called. The main difference that I see is that `CommittedOffsetsFile` functions with a map while the other 2 scala classes `OffsetCheckpointFile` and `LeaderEpochCheckpointFile` interoperate with java lists and scala sequences. So, the main difference is in `CommittedOffsetsFile` which operates with entrySet of the `partitionToConsumedOffsets`. I am thinking if we can maintain another list of TopicPartitionOffsets which gets stores the topic/partitions and the map `partitionToConsumedOffsets` stores the TopicPartitionOffsets object keyed by partition (same key as today).
We can keep updating the list as and when the entries are added/removed in the map and when we want to sync, we can pass the List as is. A very crude idea in this java program:
{code:java}
public class TestEquality {
static class TopicPartitionOffsets {
Integer tp;
Long offset;
public TopicPartitionOffsets(Integer tp, Long offset) {
this.tp = tp;
this.offset = offset;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TopicPartitionOffsets that = (TopicPartitionOffsets) o;
return Objects.equals(tp, that.tp) && Objects.equals(offset, that.offset);
}
@Override
public int hashCode() {
return Objects.hash(tp, offset);
}
@Override
public String toString() {
return "TopicPartitionOffsets{" +
"tp=" + tp +
", offset=" + offset +
'}';
}
}
public static void main(String[] args) {
Map<Integer, TopicPartitionOffsets> partitionToConsumedOffsets = new ConcurrentHashMap<>();
LinkedList<TopicPartitionOffsets> topicPartitionOffsets = new LinkedList<>();
TopicPartitionOffsets tp1 = new TopicPartitionOffsets(1, 100L);
partitionToConsumedOffsets.put(1, tp1);
topicPartitionOffsets.add(tp1);
System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets);
tp1.offset = 200L;
System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets);
TopicPartitionOffsets tp2 = partitionToConsumedOffsets.get(1);
tp2.offset = 300L;
System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets);
topicPartitionOffsets.remove(tp2);
partitionToConsumedOffsets.remove(1);
System.out.println("partitionToConsumedOffsets:" + partitionToConsumedOffsets + ", topicPartitionOffsets:" + topicPartitionOffsets);
}
} {code}
I am using a LinkedList here so that removing from the List becomes easier(adds extra time complexity though). And also, the 2 updates on the data structures should happen in an atomic fashion i.e if one fails the other one doesn't fail.
> CheckpointFile read and write API consistency
> ----------------------------------------------
>
> Key: KAFKA-14625
> URL: https://issues.apache.org/jira/browse/KAFKA-14625
> Project: Kafka
> Issue Type: Improvement
> Components: core
> Reporter: Satish Duggana
> Priority: Major
>
> ` CheckpointFile` has the below read and write APIs, write expects a Collection of items, but read returns a List of elements. It is better to look into these APIs and its usages and see whether consistency can be brought without introducing any extra collection conversions.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)