You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "lianetm (via GitHub)" <gi...@apache.org> on 2023/06/21 16:00:17 UTC

[GitHub] [kafka] lianetm opened a new pull request, #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

lianetm opened a new pull request, #13898:
URL: https://github.com/apache/kafka/pull/13898

   This is a follow up on the initial OffsetFetcher refactoring to extract reusable logic, needed for the new consumer implementation (initial refactoring merged with [PR-13815](https://github.com/apache/kafka/pull/13815).
   
   Similar to the initial refactoring, this PR brings no changes to the existing logic, just extracting functions or pieces of logic.
   
   There were no individual tests for the extracted functions, so no tests were migrated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao merged pull request #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

Posted by "junrao (via GitHub)" <gi...@apache.org>.
junrao merged PR #13898:
URL: https://github.com/apache/kafka/pull/13898


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on pull request #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on PR #13898:
URL: https://github.com/apache/kafka/pull/13898#issuecomment-1603388253

   > @philipnee @vvcephei Can you tag this as `ctr`, please?
   
   Thanks @dajac!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lianetm commented on a diff in pull request #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

Posted by "lianetm (via GitHub)" <gi...@apache.org>.
lianetm commented on code in PR #13898:
URL: https://github.com/apache/kafka/pull/13898#discussion_r1252178897


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -261,6 +276,73 @@ void updateSubscriptionState(Map<TopicPartition, OffsetFetcherUtils.ListOffsetDa
         }
     }
 
+    OffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {

Review Comment:
   Definitely, done for a few in similar situation...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on pull request #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on PR #13898:
URL: https://github.com/apache/kafka/pull/13898#issuecomment-1603067666

   @philipnee @vvcephei Can you tag this as `ctr`, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] junrao commented on a diff in pull request #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

Posted by "junrao (via GitHub)" <gi...@apache.org>.
junrao commented on code in PR #13898:
URL: https://github.com/apache/kafka/pull/13898#discussion_r1245887623


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##########
@@ -117,11 +104,6 @@ else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
      *                                                                         and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
      */
     public void resetPositionsIfNeeded() {
-        // Raise exception from previous offset fetch if there is one
-        RuntimeException exception = cachedListOffsetsException.getAndSet(null);

Review Comment:
   his seems a change in existing logic and not just a refactoring. Is this change expected?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -198,6 +208,11 @@ void validatePositionsOnMetadataChange() {
     }
 
     Map<TopicPartition, Long> getOffsetResetTimestamp() {
+        // Raise exception from previous offset fetch if there is one
+        RuntimeException exception = cachedListOffsetsException.getAndSet(null);

Review Comment:
   This seems a change in existing logic and not just a refactoring. Is this change expected?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -261,6 +276,73 @@ void updateSubscriptionState(Map<TopicPartition, OffsetFetcherUtils.ListOffsetDa
         }
     }
 
+    OffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {

Review Comment:
   Should this be static? Ditto for a few other methods moved into this class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lianetm commented on a diff in pull request #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

Posted by "lianetm (via GitHub)" <gi...@apache.org>.
lianetm commented on code in PR #13898:
URL: https://github.com/apache/kafka/pull/13898#discussion_r1252190359


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -198,6 +208,11 @@ void validatePositionsOnMetadataChange() {
     }
 
     Map<TopicPartition, Long> getOffsetResetTimestamp() {
+        // Raise exception from previous offset fetch if there is one
+        RuntimeException exception = cachedListOffsetsException.getAndSet(null);

Review Comment:
   This should keep the existing logic for the `resetPositionsIfNeeded`, only moving the exception check to this common functionality `getOffsetResetTimestamp`(to be reused), and that is already called from the reset (and only from there for now). 
   
   Existing logic checks exception and calls this `getOffsetResetTimestamp` [here](https://github.com/lianetm/kafka/blob/d5dafe22fed244d25cca8839c41221fab87d367e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L119C1-L125C104)        
   New logic performs the same check but as part of the `getOffsetResetTimestamp` called from the reset.
   
   There is also a test `testRestOffsetsAuthorizationFailure` for this logic, that passes with the new changes, ensuring [here](https://github.com/apache/kafka/blob/701f924352da1225a881f0f78f19ddf51485030a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java#L579-L584) that the behaviour remains as it was before.
   Please let me know if I'm missing some detail here...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lianetm commented on a diff in pull request #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

Posted by "lianetm (via GitHub)" <gi...@apache.org>.
lianetm commented on code in PR #13898:
URL: https://github.com/apache/kafka/pull/13898#discussion_r1252190359


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -198,6 +208,11 @@ void validatePositionsOnMetadataChange() {
     }
 
     Map<TopicPartition, Long> getOffsetResetTimestamp() {
+        // Raise exception from previous offset fetch if there is one
+        RuntimeException exception = cachedListOffsetsException.getAndSet(null);

Review Comment:
   This should keep the existing logic for the `resetPositionsIfNeeded`, only moving the exception check to this common functionality `getOffsetResetTimestamp`(to be reused), and that is already called from the reset (and only from there for now). 
   
   Existing logic checks exception and calls this `getOffsetResetTimestamp` [here](https://github.com/lianetm/kafka/blob/d5dafe22fed244d25cca8839c41221fab87d367e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java#L119C1-L125C104)        
   New logic performs the same check but as part of the `getOffsetResetTimestamp`.
   
   There is also a test `testRestOffsetsAuthorizationFailure` for this logic, that passes with the new changes, ensuring [here](https://github.com/apache/kafka/blob/701f924352da1225a881f0f78f19ddf51485030a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java#L579-L584) that the behaviour remains as it was before.
   Please let me know if I'm missing some detail here...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lianetm commented on a diff in pull request #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

Posted by "lianetm (via GitHub)" <gi...@apache.org>.
lianetm commented on code in PR #13898:
URL: https://github.com/apache/kafka/pull/13898#discussion_r1252190359


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherUtils.java:
##########
@@ -198,6 +208,11 @@ void validatePositionsOnMetadataChange() {
     }
 
     Map<TopicPartition, Long> getOffsetResetTimestamp() {
+        // Raise exception from previous offset fetch if there is one
+        RuntimeException exception = cachedListOffsetsException.getAndSet(null);

Review Comment:
   This should keep the existing logic for the `resetPositionsIfNeeded`, only moving the exception check to this common functionality `getOffsetResetTimestamp`(to be reused), and that is already called from the reset (and only from there for now). 
   
   Existing logic checks exception and calls this `getOffsetResetTimestamp` [here](    public void resetPositionsIfNeeded() {
           // Raise exception from previous offset fetch if there is one
           RuntimeException exception = cachedListOffsetsException.getAndSet(null);
           if (exception != null)
               throw exception;
   
           Map<TopicPartition, Long> offsetResetTimestamps = offsetFetcherUtils.getOffsetResetTimestamp();)
           
   New logic performs the same check but as part of the `getOffsetResetTimestamp`.
   
   There is also a test `testRestOffsetsAuthorizationFailure` for this logic, that passes with the new changes, ensuring [here](https://github.com/apache/kafka/blob/701f924352da1225a881f0f78f19ddf51485030a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java#L579-L584) that the behaviour remains as it was before.
   Please let me know if I'm missing some detail here...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lianetm commented on pull request #13898: KAFKA-14966; [2/N] Extract OffsetFetcher reusable logic

Posted by "lianetm (via GitHub)" <gi...@apache.org>.
lianetm commented on PR #13898:
URL: https://github.com/apache/kafka/pull/13898#issuecomment-1620645056

   Thanks for the comments @junrao. All addressed above. I doubled checked the test failures. Locally all tests pass. On the CI run some tests fail but unrelated. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org