You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/02 22:50:56 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

hachikuji commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r434208948



##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##########
@@ -611,7 +611,7 @@ public void testHeartbeatIllegalGenerationResponseWithOldGeneration() throws Int
 
         final AbstractCoordinator.Generation currGen = coordinator.generation();
 
-        // let the heartbeat request to send out a request
+        // let the heartbeat thread to send out a request

Review comment:
       nit: while we're at it, drop the "to"?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
##########
@@ -86,4 +84,9 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(error, leaderEpoch, endOffset);
     }
+
+    public boolean hasUndefinedEpochOrOffset() {
+        return this.endOffset == UNDEFINED_EPOCH_OFFSET ||

Review comment:
       Older versions did not return the epoch, so it was possible to see an offset defined without an epoch. However, the version that the consumer relies on should always have both or neither. Anyway, I think it is reasonable to be a little stricter here.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/EpochEndOffsetTest.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.Test;
+
+import static org.apache.kafka.common.requests.EpochEndOffset.UNDEFINED_EPOCH;
+import static org.apache.kafka.common.requests.EpochEndOffset.UNDEFINED_EPOCH_OFFSET;
+import static org.junit.Assert.assertEquals;
+
+public class EpochEndOffsetTest {
+
+    @Test
+    public void testConstructor() {
+        int leaderEpoch = 5;
+        long endOffset = 10L;
+        EpochEndOffset epochEndOffset = new EpochEndOffset(Errors.FENCED_LEADER_EPOCH, leaderEpoch, endOffset);
+
+        verify(leaderEpoch, endOffset, true, Errors.FENCED_LEADER_EPOCH, false, epochEndOffset);

Review comment:
       nit: a matter of taste I guess, but I find the tests are easier to follow when the assertions are inline.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -812,13 +813,25 @@ public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsRe
                     // For each OffsetsForLeader response, check if the end-offset is lower than our current offset
                     // for the partition. If so, it means we have experienced log truncation and need to reposition
                     // that partition's offset.
+                    //
+                    // In addition, check whether the returned offset and epoch are valid. If not, then we should reset
+                    // its offset if reset policy is configured, or throw out of range exception.
                     offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> {
-                        SubscriptionState.FetchPosition requestPosition = fetchPostitions.get(respTopicPartition);
-                        Optional<OffsetAndMetadata> divergentOffsetOpt = subscriptions.maybeCompleteValidation(
+                        SubscriptionState.FetchPosition requestPosition = fetchPositions.get(respTopicPartition);
+
+                        if (respEndOffset.hasUndefinedEpochOrOffset()) {
+                            if (subscriptions.hasDefaultOffsetResetPolicy()) {

Review comment:
       I think this is basically the same code we have handling out of range errors from fetch responses. Does it makes sense to add a helper?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -812,13 +813,25 @@ public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsRe
                     // For each OffsetsForLeader response, check if the end-offset is lower than our current offset
                     // for the partition. If so, it means we have experienced log truncation and need to reposition
                     // that partition's offset.
+                    //
+                    // In addition, check whether the returned offset and epoch are valid. If not, then we should reset
+                    // its offset if reset policy is configured, or throw out of range exception.
                     offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> {
-                        SubscriptionState.FetchPosition requestPosition = fetchPostitions.get(respTopicPartition);
-                        Optional<OffsetAndMetadata> divergentOffsetOpt = subscriptions.maybeCompleteValidation(
+                        SubscriptionState.FetchPosition requestPosition = fetchPositions.get(respTopicPartition);
+
+                        if (respEndOffset.hasUndefinedEpochOrOffset()) {
+                            if (subscriptions.hasDefaultOffsetResetPolicy()) {
+                                log.info("Fetch offset {} is out of range for partition {}, resetting offset", requestPosition, respTopicPartition);

Review comment:
       Could we add some detail to this message? Maybe something like "Leader reported no end offset larger than current fetch epoch" or something like that.

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -812,13 +813,25 @@ public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsRe
                     // For each OffsetsForLeader response, check if the end-offset is lower than our current offset
                     // for the partition. If so, it means we have experienced log truncation and need to reposition
                     // that partition's offset.
+                    //
+                    // In addition, check whether the returned offset and epoch are valid. If not, then we should reset
+                    // its offset if reset policy is configured, or throw out of range exception.
                     offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> {
-                        SubscriptionState.FetchPosition requestPosition = fetchPostitions.get(respTopicPartition);
-                        Optional<OffsetAndMetadata> divergentOffsetOpt = subscriptions.maybeCompleteValidation(
+                        SubscriptionState.FetchPosition requestPosition = fetchPositions.get(respTopicPartition);
+
+                        if (respEndOffset.hasUndefinedEpochOrOffset()) {
+                            if (subscriptions.hasDefaultOffsetResetPolicy()) {
+                                log.info("Fetch offset {} is out of range for partition {}, resetting offset", requestPosition, respTopicPartition);
+                                subscriptions.requestOffsetReset(respTopicPartition);
+                            } else {
+                                throw new OffsetOutOfRangeException(Collections.singletonMap(respTopicPartition, requestPosition.offset));

Review comment:
       Also would be useful if we could include the epoch in the exception message as well as similar change to emphasize that this was raised during epoch validation.

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -3751,6 +3755,85 @@ public void testOffsetValidationSkippedForOldBroker() {
         }
     }
 
+    @Test
+    public void testOffsetValidationSkippedForOldResponse() {
+        // Old responses may provide unreliable leader epoch,
+        // so we should skip offset validation and not send the request.
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(tp0.topic(), 4);
+
+        final int epochOne = 1;
+
+        metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
+            Collections.emptyMap(), partitionCounts, tp -> epochOne), false, 0L);
+
+        Node node = metadata.fetch().nodes().get(0);
+        assertFalse(client.isConnected(node.idString()));
+
+        // Seek with a position and leader+epoch
+        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
+            metadata.currentLeader(tp0).leader, Optional.of(epochOne));
+        subscriptions.seekUnvalidated(tp0, new SubscriptionState.FetchPosition(20L, Optional.of(epochOne), leaderAndEpoch));
+        assertFalse(client.isConnected(node.idString()));
+        assertTrue(subscriptions.awaitingValidation(tp0));
+
+        // Inject an older version of the metadata response
+        final short responseVersion = 8;
+        metadata.updateWithCurrentRequestVersion(TestUtils.metadataUpdateWith("dummy", 1,
+            Collections.emptyMap(), partitionCounts, responseVersion), false, 0L);
+        fetcher.validateOffsetsIfNeeded();
+        // Offset validation is skipped
+        assertFalse(subscriptions.awaitingValidation(tp0));
+    }
+
+    @Test
+    public void testOffsetValidationResetOffsetForUndefinedEpoch() {
+        testOffsetValidationWithGivenEpochOffset(new EpochEndOffset(EpochEndOffset.UNDEFINED_EPOCH, 0L));
+
+    }
+    @Test
+    public void testOffsetValidationResetOffsetForUndefinedOffset() {
+        testOffsetValidationWithGivenEpochOffset(new EpochEndOffset(2, EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
+    }
+
+    private void testOffsetValidationWithGivenEpochOffset(final EpochEndOffset epochEndOffset) {

Review comment:
       Is it useful also to check the case when no reset policy is defined?

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -149,7 +148,11 @@
     private TopicPartition tp1 = new TopicPartition(topicName, 1);
     private TopicPartition tp2 = new TopicPartition(topicName, 2);
     private TopicPartition tp3 = new TopicPartition(topicName, 3);
-    private MetadataResponse initialUpdateResponse = TestUtils.metadataUpdateWith(1, singletonMap(topicName, 4));
+    private int validLeaderEpoch = 0;
+    private MetadataResponse initialUpdateResponse =
+        TestUtils.metadataUpdateWith(1, singletonMap(topicName, 4));
+    private MetadataResponse initialUpdateResponseWithLeaderEpoch =

Review comment:
       nit: if one of these is an uncommon case, maybe we can make it local to the test cases that need it.

##########
File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
##########
@@ -337,7 +345,7 @@ public void testFetchSkipsBlackedOutNodes() {
 
         assignFromUser(singleton(tp0));
         subscriptions.seek(tp0, 0);
-        client.updateMetadata(initialUpdateResponse);
+        client.updateMetadata(initialUpdateResponseWithLeaderEpoch);

Review comment:
       Do we need to change some of these other tests? We're still using `initialUpdateResponse` above in `assignFromUser` and below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org