You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/23 00:13:58 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

guozhangwang commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r413417021



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() {
         validateOffsetsAsync(partitionsToValidate);
     }
 
+    /**
+     * For each partition which needs validation, make an asynchronous request to get the end-offsets for the partition
+     * with the epoch less than or equal to the epoch the partition last saw.
+     *
+     * Requests are grouped by Node for efficiency.
+     */
+    private void validateOffsetsAsync(Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate) {
+        final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regrouped =
+            regroupFetchPositionsByLeader(partitionsToValidate);
+
+        regrouped.forEach((node, fetchPositions) -> {
+            if (node.isEmpty()) {
+                metadata.requestUpdate();
+                return;
+            }
+
+            NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+            if (nodeApiVersions == null) {
+                client.tryConnect(node);
+                return;
+            }
+
+            if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+                log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " +
+                              "support the required protocol version (introduced in Kafka 2.3)",
+                    fetchPositions.keySet());
+                completeAllValidations(fetchPositions);
+                return;
+            }
+
+            // We need to get the client epoch state before sending out the leader epoch request, and use it to
+            // decide whether we need to validate offsets.
+            if (!metadata.hasReliableLeaderEpochs()) {
+                log.debug("Skipping validation of fetch offsets for partitions {} since the provided leader broker " +
+                              "is not reliable", fetchPositions.keySet());
+                completeAllValidations(fetchPositions);
+                return;
+            }
+
+            subscriptions.setNextAllowedRetry(fetchPositions.keySet(), time.milliseconds() + requestTimeoutMs);
+
+            RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future =
+                offsetsForLeaderEpochClient.sendAsyncRequest(node, fetchPositions);
+
+            future.addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() {
+                @Override
+                public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {
+                    Map<TopicPartition, OffsetAndMetadata> truncationWithoutResetPolicy = new HashMap<>();
+                    if (!offsetsResult.partitionsToRetry().isEmpty()) {
+                        subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), time.milliseconds() + retryBackoffMs);
+                        metadata.requestUpdate();
+                    }
+
+                    // For each OffsetsForLeader response, check if the end-offset is lower than our current offset
+                    // for the partition. If so, it means we have experienced log truncation and need to reposition
+                    // that partition's offset.
+                    //
+                    // In addition, check whether the returned offset and epoch are valid. If not, then we should treat
+                    // it as out of range and update metadata for rediscovery.
+                    offsetsResult.endOffsets().forEach((respTopicPartition, respEndOffset) -> {
+                        if (respEndOffset.hasUndefinedEpochOrOffset()) {
+                            // Should attempt to find the new leader in the next try.
+                            log.debug("Requesting metadata update for partition {} due to undefined epoch or offset {}",

Review comment:
       nit: `... or offset {} from OffsetsForLeaderEpoch response`

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
##########
@@ -86,4 +84,9 @@ public boolean equals(Object o) {
     public int hashCode() {
         return Objects.hash(error, leaderEpoch, endOffset);
     }
+
+    public boolean hasUndefinedEpochOrOffset() {
+        return this.endOffset == UNDEFINED_EPOCH_OFFSET ||

Review comment:
       For my own understanding: if endOffset is UNDEFINED the epoch should always be UNDEFINED too? If that's the case we can just rely on `leaderEpoch` alone?

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() {
         validateOffsetsAsync(partitionsToValidate);
     }
 
+    /**
+     * For each partition which needs validation, make an asynchronous request to get the end-offsets for the partition
+     * with the epoch less than or equal to the epoch the partition last saw.
+     *
+     * Requests are grouped by Node for efficiency.
+     */
+    private void validateOffsetsAsync(Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate) {
+        final Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regrouped =
+            regroupFetchPositionsByLeader(partitionsToValidate);
+
+        regrouped.forEach((node, fetchPositions) -> {
+            if (node.isEmpty()) {
+                metadata.requestUpdate();
+                return;
+            }
+
+            NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+            if (nodeApiVersions == null) {
+                client.tryConnect(node);
+                return;
+            }
+
+            if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+                log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " +
+                              "support the required protocol version (introduced in Kafka 2.3)",
+                    fetchPositions.keySet());
+                completeAllValidations(fetchPositions);
+                return;
+            }
+
+            // We need to get the client epoch state before sending out the leader epoch request, and use it to
+            // decide whether we need to validate offsets.
+            if (!metadata.hasReliableLeaderEpochs()) {

Review comment:
       Could you not move the function while changing it so that the diff is easier to check -- e.g. here I'd have to blindly trust you that there are only changes of this func :)

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
##########
@@ -85,10 +85,6 @@ protected OffsetForEpochResult handleResponse(
                 case KAFKA_STORAGE_ERROR:
                 case OFFSET_NOT_AVAILABLE:
                 case LEADER_NOT_AVAILABLE:
-                    logger().debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.",

Review comment:
       Why we can remove this logic?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org