You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/06 19:18:51 UTC

[GitHub] [kafka] hachikuji opened a new pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

hachikuji opened a new pull request #8822:
URL: https://github.com/apache/kafka/pull/8822


   This patch fixes a bug in the constructor of `LogTruncationException`. We were passing the divergent offsets to the super constructor as the fetch offsets. There is no way to fix this without breaking compatibility, but the harm is probably minimal since this exception was not getting raised properly until KAFKA-9840 anyway.
   
   Note that I have also moved the check for unknown offset and epoch into `SubscriptionState`, which ensures that the partition is still awaiting validation and that the fetch offset hasn't changed. Finally, I made some minor improvements to the logging and exception messages to ensure that we always have the fetch offset and epoch as well as the divergent offset and epoch included.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#discussion_r436387869



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -1050,4 +1062,43 @@ public String toString() {
                     '}';
         }
     }
+
+    public static class LogTruncation {
+        public final TopicPartition topicPartition;
+        public final FetchPosition fetchPosition;
+        public final Optional<OffsetAndMetadata> divergentOffsetOpt;
+
+        public LogTruncation(TopicPartition topicPartition,
+                             FetchPosition fetchPosition,
+                             Optional<OffsetAndMetadata> divergentOffsetOpt) {
+            this.topicPartition = topicPartition;
+            this.fetchPosition = fetchPosition;

Review comment:
       Out of curiosity, why `fetchPosition` specifically and not the other fields?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#discussion_r436318626



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -852,14 +835,28 @@ public void onFailure(RuntimeException e) {
                     subscriptions.requestFailed(fetchPositions.keySet(), time.milliseconds() + retryBackoffMs);
                     metadata.requestUpdate();
 
-                    setFatalOffsetForLeaderException(e);
+                    if (!(e instanceof RetriableException)) {

Review comment:
       Why do we move this check out of `setFatalOffsetForLeaderException`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1325,23 +1323,15 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc
         return completedFetch;
     }
 
-    private void handleOffsetOutOfRange(FetchPosition fetchPosition,
-                                        TopicPartition topicPartition,
-                                        String reason) {
+    private void handleOffsetOutOfRange(FetchPosition fetchPosition, TopicPartition topicPartition) {
+        String errorMessage = "Fetch position " + fetchPosition + " is out of range for partition " + topicPartition;
         if (subscriptions.hasDefaultOffsetResetPolicy()) {
-            log.info("Fetch offset epoch {} is out of range for partition {}, resetting offset",
-                fetchPosition, topicPartition);
+            log.info("{}, resetting offset", errorMessage);
             subscriptions.requestOffsetReset(topicPartition);
         } else {
-            Map<TopicPartition, Long> offsetOutOfRangePartitions =
-                Collections.singletonMap(topicPartition, fetchPosition.offset);
-            String errorMessage = String.format("Offsets out of range " +
-                "with no configured reset policy for partitions: %s" +
-                ", for fetch offset: %d, " +
-                "root cause: %s",
-                offsetOutOfRangePartitions, fetchPosition.offset, reason);
-            log.info(errorMessage);
-            throw new OffsetOutOfRangeException(errorMessage, offsetOutOfRangePartitions);
+            log.info("{}, raising error to the application", errorMessage);
+            throw new OffsetOutOfRangeException(errorMessage,

Review comment:
       Should still mention `no configured reset policy` here IMHO.

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -3856,12 +3857,16 @@ private void testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch
         assertEquals(initialOffset, subscriptions.position(tp0).offset);
 
         if (offsetResetStrategy == OffsetResetStrategy.NONE) {
-            OffsetOutOfRangeException thrown =
-                assertThrows(OffsetOutOfRangeException.class, () -> fetcher.validateOffsetsIfNeeded());
+            LogTruncationException thrown =
+                assertThrows(LogTruncationException.class, () -> fetcher.validateOffsetsIfNeeded());
+            assertEquals(initialOffset, thrown.offsetOutOfRangePartitions().get(tp0).longValue());
 
-            // If epoch offset is valid, we are testing for the log truncation case.
-            if (!epochEndOffset.hasUndefinedEpochOrOffset()) {
-                assertTrue(thrown instanceof LogTruncationException);
+            if (epochEndOffset.hasUndefinedEpochOrOffset()) {
+                assertFalse(thrown.divergentOffsets().containsKey(tp0));
+            } else {
+                OffsetAndMetadata expectedDivergentOffset = new OffsetAndMetadata(
+                    epochEndOffset.endOffset(), Optional.of(epochEndOffset.leaderEpoch()), "");
+                assertEquals(expectedDivergentOffset, thrown.divergentOffsets().get(tp0));

Review comment:
       Similar here, compare against an actual map to ensure no unexpected topic partitions exist.

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -3856,12 +3857,16 @@ private void testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch
         assertEquals(initialOffset, subscriptions.position(tp0).offset);
 
         if (offsetResetStrategy == OffsetResetStrategy.NONE) {
-            OffsetOutOfRangeException thrown =
-                assertThrows(OffsetOutOfRangeException.class, () -> fetcher.validateOffsetsIfNeeded());
+            LogTruncationException thrown =
+                assertThrows(LogTruncationException.class, () -> fetcher.validateOffsetsIfNeeded());
+            assertEquals(initialOffset, thrown.offsetOutOfRangePartitions().get(tp0).longValue());

Review comment:
       We could just build the actual singleton mapping to compare with `thrown.offsetOutOfRangePartitions()`

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -1050,4 +1062,43 @@ public String toString() {
                     '}';
         }
     }
+
+    public static class LogTruncation {
+        public final TopicPartition topicPartition;
+        public final FetchPosition fetchPosition;
+        public final Optional<OffsetAndMetadata> divergentOffsetOpt;
+
+        public LogTruncation(TopicPartition topicPartition,
+                             FetchPosition fetchPosition,
+                             Optional<OffsetAndMetadata> divergentOffsetOpt) {
+            this.topicPartition = topicPartition;
+            this.fetchPosition = fetchPosition;

Review comment:
       Should require non-null for `fetchPosition`

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
##########
@@ -572,7 +573,7 @@ public void testMaybeCompleteValidationAfterPositionChange() {
                 Optional.of(updateOffsetEpoch), new Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
         state.seekUnvalidated(tp0, updatePosition);
 
-        Optional<OffsetAndMetadata> divergentOffsetMetadataOpt = state.maybeCompleteValidation(tp0, initialPosition,
+        Optional<LogTruncation> divergentOffsetMetadataOpt = state.maybeCompleteValidation(tp0, initialPosition,

Review comment:
       divergentOffsetMetadataOpt -> logTruncationOpt, same for the rest.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -480,6 +480,17 @@ public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions ap
                 log.debug("Skipping completed validation for partition {} since the current position {} " +
                                 "no longer matches the position {} when the request was sent",
                         tp, currentPosition, requestPosition);
+            } else if (epochEndOffset.hasUndefinedEpochOrOffset()) {

Review comment:
       I was thinking where is the best to put the check, since previously it was before `maybeCompleteValidation`. If the partition is not awaiting validation or the returned result doesn't match our current position, should we still trigger undefined epoch offset logic here?

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -3856,12 +3857,16 @@ private void testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch
         assertEquals(initialOffset, subscriptions.position(tp0).offset);
 
         if (offsetResetStrategy == OffsetResetStrategy.NONE) {
-            OffsetOutOfRangeException thrown =
-                assertThrows(OffsetOutOfRangeException.class, () -> fetcher.validateOffsetsIfNeeded());
+            LogTruncationException thrown =
+                assertThrows(LogTruncationException.class, () -> fetcher.validateOffsetsIfNeeded());
+            assertEquals(initialOffset, thrown.offsetOutOfRangePartitions().get(tp0).longValue());
 
-            // If epoch offset is valid, we are testing for the log truncation case.
-            if (!epochEndOffset.hasUndefinedEpochOrOffset()) {
-                assertTrue(thrown instanceof LogTruncationException);
+            if (epochEndOffset.hasUndefinedEpochOrOffset()) {
+                assertFalse(thrown.divergentOffsets().containsKey(tp0));

Review comment:
       Should we just assert `thrown.divergentOffsets().isEmpty()`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1325,23 +1323,15 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc
         return completedFetch;
     }
 
-    private void handleOffsetOutOfRange(FetchPosition fetchPosition,
-                                        TopicPartition topicPartition,
-                                        String reason) {
+    private void handleOffsetOutOfRange(FetchPosition fetchPosition, TopicPartition topicPartition) {

Review comment:
       I feel we could still share `handleOffsetOutOfRange` in two places by letting it return a struct of `Optional<LogTruncation>` and decide when to throw it by the caller.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#discussion_r436386913



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -852,14 +835,28 @@ public void onFailure(RuntimeException e) {
                     subscriptions.requestFailed(fetchPositions.keySet(), time.milliseconds() + retryBackoffMs);
                     metadata.requestUpdate();
 
-                    setFatalOffsetForLeaderException(e);
+                    if (!(e instanceof RetriableException)) {

Review comment:
       Seemed inconsistent to have a method named `setFatal` which checks for retriable exceptions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#issuecomment-646361035


   Sounds good to me


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#discussion_r438242115



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -466,9 +466,9 @@ public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions ap
      * Attempt to complete validation with the end offset returned from the OffsetForLeaderEpoch request.
      * @return The diverging offset if truncation was detected and no reset policy is defined.

Review comment:
       Need to update the return javadoc

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -480,6 +480,17 @@ public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions ap
                 log.debug("Skipping completed validation for partition {} since the current position {} " +
                                 "no longer matches the position {} when the request was sent",
                         tp, currentPosition, requestPosition);
+            } else if (epochEndOffset.hasUndefinedEpochOrOffset()) {

Review comment:
       Ok, so before this change, we were raising OffsetOutOfRangeException regardless of the state of the subscription which meant that a regular truncation case was being masked as a failed offset validation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#discussion_r436391255



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -3856,12 +3857,16 @@ private void testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epoch
         assertEquals(initialOffset, subscriptions.position(tp0).offset);
 
         if (offsetResetStrategy == OffsetResetStrategy.NONE) {
-            OffsetOutOfRangeException thrown =
-                assertThrows(OffsetOutOfRangeException.class, () -> fetcher.validateOffsetsIfNeeded());
+            LogTruncationException thrown =
+                assertThrows(LogTruncationException.class, () -> fetcher.validateOffsetsIfNeeded());
+            assertEquals(initialOffset, thrown.offsetOutOfRangePartitions().get(tp0).longValue());
 
-            // If epoch offset is valid, we are testing for the log truncation case.
-            if (!epochEndOffset.hasUndefinedEpochOrOffset()) {
-                assertTrue(thrown instanceof LogTruncationException);
+            if (epochEndOffset.hasUndefinedEpochOrOffset()) {
+                assertFalse(thrown.divergentOffsets().containsKey(tp0));

Review comment:
       I did assertEquals with an empty map.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#discussion_r436390686



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1325,23 +1323,15 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc
         return completedFetch;
     }
 
-    private void handleOffsetOutOfRange(FetchPosition fetchPosition,
-                                        TopicPartition topicPartition,
-                                        String reason) {
+    private void handleOffsetOutOfRange(FetchPosition fetchPosition, TopicPartition topicPartition) {

Review comment:
       After thinking about it, it seemed simpler to always use LogTruncationException for validation failures, even if the divergent offset is not known. Then direct OffsetOutOfRange errors are reserved for fetch responses which indicate the OFFSET_OUT_OF_RANGE error.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#issuecomment-645526618


   @mumrah Hmm, we have the following check in `initializeCompletedFetch`:
   ```java
               if (!subscriptions.hasValidPosition(tp)) {
                   // this can happen when a rebalance happened while fetch is still in-flight
                   log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp);
               } else if (error == Errors.NONE) {
   ```
   Are you suggesting we may want to remove this check?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #8822:
URL: https://github.com/apache/kafka/pull/8822


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#discussion_r436387154



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -480,6 +480,17 @@ public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions ap
                 log.debug("Skipping completed validation for partition {} since the current position {} " +
                                 "no longer matches the position {} when the request was sent",
                         tp, currentPosition, requestPosition);
+            } else if (epochEndOffset.hasUndefinedEpochOrOffset()) {

Review comment:
       If the fetch position has changed or we are no longer awaiting validation, we want to ignore the result. This was a bug in the previous patch which we didn't catch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#issuecomment-646168538


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#issuecomment-646348733


   @mumrah Hmm, I think I like the current approach of discarding the response if we're no longer in the same state in which the fetch state was sent. Mainly because it's simple. Arguably we could do something more refined. For example, a topic authorization error is still going to be relevant even if the partition is being reset. However, since we're talking about rare cases, it doesn't seem too worthwhile to try and optimize; worst case, we'll send the request again and get the same error.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#discussion_r441719668



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -480,6 +480,17 @@ public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions ap
                 log.debug("Skipping completed validation for partition {} since the current position {} " +
                                 "no longer matches the position {} when the request was sent",
                         tp, currentPosition, requestPosition);
+            } else if (epochEndOffset.hasUndefinedEpochOrOffset()) {

Review comment:
       It was what @abbccdda and I had agreed in the previous PR. The problem was that we didn't have divergent offsets to include in the exception, so we just raised it as OffsetOutOfRange. After I noticed the problem with LogTruncationException here, I decided to just simplify the logic here and return the truncation exception with the divergent offsets undefined.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
mumrah commented on pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#issuecomment-646338829


   @hachikuji yea that's the check I was referring to (where we disregard the fetch response, errors included). Do you think any of the errors we handle besides OOOR are worth handling in the case that we're no longer in the FETCHING state? Like maybe one of the errors that triggers a metadata update?
   
   However, that might be adding complexity for little gain. I'm fine with it either way.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #8822: KAFKA-10113; Specify fetch offsets correctly in LogTruncationException

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #8822:
URL: https://github.com/apache/kafka/pull/8822#discussion_r436800666



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##########
@@ -1050,4 +1062,43 @@ public String toString() {
                     '}';
         }
     }
+
+    public static class LogTruncation {
+        public final TopicPartition topicPartition;
+        public final FetchPosition fetchPosition;
+        public final Optional<OffsetAndMetadata> divergentOffsetOpt;
+
+        public LogTruncation(TopicPartition topicPartition,
+                             FetchPosition fetchPosition,
+                             Optional<OffsetAndMetadata> divergentOffsetOpt) {
+            this.topicPartition = topicPartition;
+            this.fetchPosition = fetchPosition;

Review comment:
       After a second thought, I don't feel strong about it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org