You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/03 18:00:08 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #11818: KAFKA-12558: Do not prematurely mutate partiton state and provide con…

C0urante commented on code in PR #11818:
URL: https://github.com/apache/kafka/pull/11818#discussion_r1060825828


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -294,5 +302,9 @@ boolean update(long upstreamOffset, long downstreamOffset) {
             previousDownstreamOffset = downstreamOffset;
             return shouldSyncOffsets;
         }
+
+        void reset() {
+            shouldSyncOffsets = false;

Review Comment:
   I was wondering if we should also move the updates for the `lastSyncUpstreamOffset` and `lastSyncDownstreamOffset` fields to this method, but there doesn't appear to be any downsides to this approach given our current usages of `update` and `reset`. 👍 



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##########
@@ -69,14 +69,19 @@ public MirrorSourceTask() {}
 
     // for testing
     MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, MirrorSourceMetrics metrics, String sourceClusterAlias,
-                     ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer) {
+                     ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer,
+                     Semaphore outstandingOffsetSyncs, Map<TopicPartition, PartitionState> partitionStates,

Review Comment:
   I'm not sure we should be mocking the semaphore here, since it's a bit of an implementation detail and I've already proposed that we split it out into several different semaphores (one for each topic partition) in https://github.com/apache/kafka/pull/12366.
   
   Can we get the same testing coverage/guarantees by mocking out the offset sync producer instead, and selectively invoking/not invoking the callbacks passed to it in `send`?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java:
##########
@@ -81,15 +88,25 @@ public void testOffsetSync() {
         MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50);
 
         assertTrue(partitionState.update(0, 100), "always emit offset sync on first update");
+        partitionState.reset();

Review Comment:
   Is it worth adding coverage to this case (and possibly `testZeroOffsetSync`) for when we invoke `update` repeatedly without invoking `reset`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org