You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "kirktrue (via GitHub)" <gi...@apache.org> on 2023/02/03 01:03:17 UTC

[GitHub] [kafka] kirktrue opened a new pull request, #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher

kirktrue opened a new pull request, #13192:
URL: https://github.com/apache/kafka/pull/13192

   Extract from `Fetcher` the APIs that are related to metadata operations into a new class named `MetadataFetcher`. This will allow the refactoring of `Fetcher` and `MetadataFetcher` for the new consumer. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1110445520


##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1249,7 +1263,7 @@ private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetad
                     //
                     // NOTE: since the consumed position has already been updated, we must not allow

Review Comment:
   I think you could argue the comment is still correct if possibly misleading. We track the consumed position in the same field as the fetch position, but we have indeed updated it at this point. And we might end up bringing back the old field anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1109189406


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java:
##########
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+public class MetadataFetcher {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final ConsumerNetworkClient client;
+    private final Time time;
+    private final long retryBackoffMs;
+    private final long requestTimeoutMs;
+    private final IsolationLevel isolationLevel;
+    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
+    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
+    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+    private final ApiVersions apiVersions;
+    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+    public MetadataFetcher(LogContext logContext,
+                           ConsumerNetworkClient client,
+                           ConsumerMetadata metadata,
+                           SubscriptionState subscriptions,
+                           Time time,
+                           long retryBackoffMs,
+                           long requestTimeoutMs,
+                           IsolationLevel isolationLevel,
+                           ApiVersions apiVersions) {
+        this.log = logContext.logger(getClass());
+        this.time = time;
+        this.client = client;
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.retryBackoffMs = retryBackoffMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.isolationLevel = isolationLevel;
+        this.apiVersions = apiVersions;
+        this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
+    }
+
+    /**
+     * Represents data about an offset returned by a broker.
+     */
+    static class ListOffsetData {
+        final long offset;
+        final Long timestamp; //  null if the broker does not support returning timestamps
+        final Optional<Integer> leaderEpoch; // empty if the leader epoch is not known
+
+        ListOffsetData(long offset, Long timestamp, Optional<Integer> leaderEpoch) {
+            this.offset = offset;
+            this.timestamp = timestamp;
+            this.leaderEpoch = leaderEpoch;
+        }
+    }
+
+    private Long offsetResetStrategyTimestamp(final TopicPartition partition) {
+        OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
+        if (strategy == OffsetResetStrategy.EARLIEST)
+            return ListOffsetsRequest.EARLIEST_TIMESTAMP;
+        else if (strategy == OffsetResetStrategy.LATEST)
+            return ListOffsetsRequest.LATEST_TIMESTAMP;
+        else
+            return null;
+    }
+
+    private OffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {
+        if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP)
+            return OffsetResetStrategy.EARLIEST;
+        else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
+            return OffsetResetStrategy.LATEST;
+        else
+            return null;
+    }
+
+    /**
+     * Get topic metadata for all topics in the cluster
+     * @param timer Timer bounding how long this method can block
+     * @return The map of topics with their partition information
+     */
+    public Map<String, List<PartitionInfo>> getAllTopicMetadata(Timer timer) {
+        return getTopicMetadata(MetadataRequest.Builder.allTopics(), timer);
+    }
+
+    /**
+     * Get metadata for all topics present in Kafka cluster
+     *
+     * @param request The MetadataRequest to send
+     * @param timer Timer bounding how long this method can block
+     * @return The map of topics with their partition information
+     */
+    public Map<String, List<PartitionInfo>> getTopicMetadata(MetadataRequest.Builder request, Timer timer) {
+        // Save the round trip if no topics are requested.
+        if (!request.isAllTopics() && request.emptyTopicList())
+            return Collections.emptyMap();
+
+        do {
+            RequestFuture<ClientResponse> future = sendMetadataRequest(request);
+            client.poll(future, timer);
+
+            if (future.failed() && !future.isRetriable())
+                throw future.exception();
+
+            if (future.succeeded()) {
+                MetadataResponse response = (MetadataResponse) future.value().responseBody();
+                Cluster cluster = response.buildCluster();
+
+                Set<String> unauthorizedTopics = cluster.unauthorizedTopics();
+                if (!unauthorizedTopics.isEmpty())
+                    throw new TopicAuthorizationException(unauthorizedTopics);
+
+                boolean shouldRetry = false;
+                Map<String, Errors> errors = response.errors();
+                if (!errors.isEmpty()) {
+                    // if there were errors, we need to check whether they were fatal or whether
+                    // we should just retry
+
+                    log.debug("Topic metadata fetch included errors: {}", errors);
+
+                    for (Map.Entry<String, Errors> errorEntry : errors.entrySet()) {
+                        String topic = errorEntry.getKey();
+                        Errors error = errorEntry.getValue();
+
+                        if (error == Errors.INVALID_TOPIC_EXCEPTION)
+                            throw new InvalidTopicException("Topic '" + topic + "' is invalid");
+                        else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION)
+                            // if a requested topic is unknown, we just continue and let it be absent
+                            // in the returned map
+                            continue;
+                        else if (error.exception() instanceof RetriableException)
+                            shouldRetry = true;
+                        else
+                            throw new KafkaException("Unexpected error fetching metadata for topic " + topic,
+                                    error.exception());
+                    }
+                }
+
+                if (!shouldRetry) {
+                    HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>();
+                    for (String topic : cluster.topics())
+                        topicsPartitionInfos.put(topic, cluster.partitionsForTopic(topic));
+                    return topicsPartitionInfos;
+                }
+            }
+
+            timer.sleep(retryBackoffMs);
+        } while (timer.notExpired());
+
+        throw new TimeoutException("Timeout expired while fetching topic metadata");
+    }
+
+    /**
+     * Send Metadata Request to least loaded node in Kafka cluster asynchronously
+     * @return A future that indicates result of sent metadata request
+     */
+    private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest.Builder request) {
+        final Node node = client.leastLoadedNode();
+        if (node == null)
+            return RequestFuture.noBrokersAvailable();
+        else
+            return client.send(node, request);
+    }
+
+    /**
+     * Reset offsets for all assigned partitions that require it.
+     *
+     * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
+     *   and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
+     */
+    public void resetOffsetsIfNeeded() {
+        // Raise exception from previous offset fetch if there is one
+        RuntimeException exception = cachedListOffsetsException.getAndSet(null);
+        if (exception != null)
+            throw exception;
+
+        Set<TopicPartition> partitions = subscriptions.partitionsNeedingReset(time.milliseconds());
+        if (partitions.isEmpty())
+            return;
+
+        final Map<TopicPartition, Long> offsetResetTimestamps = new HashMap<>();
+        for (final TopicPartition partition : partitions) {
+            Long timestamp = offsetResetStrategyTimestamp(partition);
+            if (timestamp != null)
+                offsetResetTimestamps.put(partition, timestamp);
+        }
+
+        resetOffsetsAsync(offsetResetTimestamps);
+    }
+
+    /**
+     * Validate offsets for all assigned partitions for which a leader change has been detected.
+     */
+    public void validateOffsetsIfNeeded() {

Review Comment:
   They used to be the same, but then later as we add more fields to Position it now becomes
   
   ```
   class FetchPosition {
           public final long offset;
           final Optional<Integer> offsetEpoch;
           final Metadata.LeaderAndEpoch currentLeader;
   }
   ```
   
   And when we reset/validate, we'd touch on other fields than the offsets itself.
   
   Another rationale is to distinguish the `position offset` from the `fetch offset` :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1106476882


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java:
##########
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+public class MetadataFetcher {

Review Comment:
   I'd called this class `OffsetsFinder` or something in a previous incarnation, but it was thought to be too specific of a name since the class handles topic metadata too.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java:
##########
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+public class MetadataFetcher {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final ConsumerNetworkClient client;
+    private final Time time;
+    private final long retryBackoffMs;
+    private final long requestTimeoutMs;
+    private final IsolationLevel isolationLevel;
+    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
+    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
+    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+    private final ApiVersions apiVersions;
+    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);

Review Comment:
   OK, I'll look into that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1107931054


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##########
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+/**
+ * {@link OffsetFetcher} is responsible for fetching the {@link OffsetAndTimestamp offsets} for
+ * a given set of {@link TopicPartition topic and partition pairs} and for validation and resetting of positions,
+ * as needed.
+ */
+public class OffsetFetcher {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final ConsumerNetworkClient client;
+    private final Time time;
+    private final long retryBackoffMs;
+    private final long requestTimeoutMs;
+    private final IsolationLevel isolationLevel;
+    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
+    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
+    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+    private final ApiVersions apiVersions;
+    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+    public OffsetFetcher(LogContext logContext,
+                         ConsumerNetworkClient client,
+                         ConsumerMetadata metadata,
+                         SubscriptionState subscriptions,
+                         Time time,
+                         long retryBackoffMs,
+                         long requestTimeoutMs,
+                         IsolationLevel isolationLevel,
+                         ApiVersions apiVersions) {
+        this.log = logContext.logger(getClass());
+        this.time = time;
+        this.client = client;
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.retryBackoffMs = retryBackoffMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.isolationLevel = isolationLevel;
+        this.apiVersions = apiVersions;
+        this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
+    }
+
+    /**
+     * Represents data about an offset returned by a broker.
+     */
+    static class ListOffsetData {
+        final long offset;
+        final Long timestamp; //  null if the broker does not support returning timestamps

Review Comment:
   Seems like a good place to use Optional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1106476284


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java:
##########
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+public class MetadataFetcher {

Review Comment:
   😱 Thanks for catching the complete lack of documentation!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1107920035


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java:
##########
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+public class MetadataFetcher {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final ConsumerNetworkClient client;
+    private final Time time;
+    private final long retryBackoffMs;
+    private final long requestTimeoutMs;
+    private final IsolationLevel isolationLevel;
+    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
+    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
+    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+    private final ApiVersions apiVersions;
+    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);

Review Comment:
   Unfortunately, there are a couple of test classes that call `sendFetches` too, so I had to make a similar method there as well 🤷‍♂️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1110451025


##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1269,6 +1283,11 @@ private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetad
         }
     }
 
+    private int sendFetches() {
+        offsetFetcher.validatePositionsOnMetadataChange();

Review Comment:
   The relocation of this makes me wonder if it's needed at all. We already call the same method in `updateFetchPositions`, which is invoked prior to `sendFetches`. I tried removing it locally and all tests still pass. Probably not a good idea to remove here in the refactor, but maybe we could do it in a follow-up. That would simplify the `OffsetFetcher` api a little.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1107920411


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java:
##########
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+public class MetadataFetcher {

Review Comment:
   I have split `MetadataFetcher` yet further, into `OffsetFetcher` and `TopicMetadataFetcher` and added brief class-level Javadoc comments to both.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on PR #13192:
URL: https://github.com/apache/kafka/pull/13192#issuecomment-1414573568

   cc @hachikuji @rajinisivaram @philipnee 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1110454390


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java:
##########
@@ -252,6 +230,11 @@ public void teardown() throws Exception {
         }
     }
 
+    private int sendFetches() {
+        offsetFetcher.validatePositionsOnMetadataChange();

Review Comment:
   Got it. The tests are probably the main reason we called the method from `sendFetches`. What we're really depending on is the transition to `Fetching` in `SubscriptionState`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1107920860


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java:
##########
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+public class MetadataFetcher {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final ConsumerNetworkClient client;
+    private final Time time;
+    private final long retryBackoffMs;
+    private final long requestTimeoutMs;
+    private final IsolationLevel isolationLevel;
+    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
+    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
+    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+    private final ApiVersions apiVersions;
+    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+    public MetadataFetcher(LogContext logContext,
+                           ConsumerNetworkClient client,
+                           ConsumerMetadata metadata,
+                           SubscriptionState subscriptions,
+                           Time time,
+                           long retryBackoffMs,
+                           long requestTimeoutMs,
+                           IsolationLevel isolationLevel,
+                           ApiVersions apiVersions) {
+        this.log = logContext.logger(getClass());
+        this.time = time;
+        this.client = client;
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.retryBackoffMs = retryBackoffMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.isolationLevel = isolationLevel;
+        this.apiVersions = apiVersions;
+        this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
+    }
+
+    /**
+     * Represents data about an offset returned by a broker.
+     */
+    static class ListOffsetData {
+        final long offset;
+        final Long timestamp; //  null if the broker does not support returning timestamps
+        final Optional<Integer> leaderEpoch; // empty if the leader epoch is not known
+
+        ListOffsetData(long offset, Long timestamp, Optional<Integer> leaderEpoch) {
+            this.offset = offset;
+            this.timestamp = timestamp;
+            this.leaderEpoch = leaderEpoch;
+        }
+    }
+
+    private Long offsetResetStrategyTimestamp(final TopicPartition partition) {
+        OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
+        if (strategy == OffsetResetStrategy.EARLIEST)
+            return ListOffsetsRequest.EARLIEST_TIMESTAMP;
+        else if (strategy == OffsetResetStrategy.LATEST)
+            return ListOffsetsRequest.LATEST_TIMESTAMP;
+        else
+            return null;
+    }
+
+    private OffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {
+        if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP)
+            return OffsetResetStrategy.EARLIEST;
+        else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
+            return OffsetResetStrategy.LATEST;
+        else
+            return null;
+    }
+
+    /**
+     * Get topic metadata for all topics in the cluster
+     * @param timer Timer bounding how long this method can block
+     * @return The map of topics with their partition information
+     */
+    public Map<String, List<PartitionInfo>> getAllTopicMetadata(Timer timer) {
+        return getTopicMetadata(MetadataRequest.Builder.allTopics(), timer);
+    }
+
+    /**
+     * Get metadata for all topics present in Kafka cluster
+     *
+     * @param request The MetadataRequest to send
+     * @param timer Timer bounding how long this method can block
+     * @return The map of topics with their partition information
+     */
+    public Map<String, List<PartitionInfo>> getTopicMetadata(MetadataRequest.Builder request, Timer timer) {
+        // Save the round trip if no topics are requested.
+        if (!request.isAllTopics() && request.emptyTopicList())
+            return Collections.emptyMap();
+
+        do {
+            RequestFuture<ClientResponse> future = sendMetadataRequest(request);
+            client.poll(future, timer);
+
+            if (future.failed() && !future.isRetriable())
+                throw future.exception();
+
+            if (future.succeeded()) {
+                MetadataResponse response = (MetadataResponse) future.value().responseBody();
+                Cluster cluster = response.buildCluster();
+
+                Set<String> unauthorizedTopics = cluster.unauthorizedTopics();
+                if (!unauthorizedTopics.isEmpty())
+                    throw new TopicAuthorizationException(unauthorizedTopics);
+
+                boolean shouldRetry = false;
+                Map<String, Errors> errors = response.errors();
+                if (!errors.isEmpty()) {
+                    // if there were errors, we need to check whether they were fatal or whether
+                    // we should just retry
+
+                    log.debug("Topic metadata fetch included errors: {}", errors);
+
+                    for (Map.Entry<String, Errors> errorEntry : errors.entrySet()) {
+                        String topic = errorEntry.getKey();
+                        Errors error = errorEntry.getValue();
+
+                        if (error == Errors.INVALID_TOPIC_EXCEPTION)
+                            throw new InvalidTopicException("Topic '" + topic + "' is invalid");
+                        else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION)
+                            // if a requested topic is unknown, we just continue and let it be absent
+                            // in the returned map
+                            continue;
+                        else if (error.exception() instanceof RetriableException)
+                            shouldRetry = true;
+                        else
+                            throw new KafkaException("Unexpected error fetching metadata for topic " + topic,
+                                    error.exception());
+                    }
+                }
+
+                if (!shouldRetry) {
+                    HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>();
+                    for (String topic : cluster.topics())
+                        topicsPartitionInfos.put(topic, cluster.partitionsForTopic(topic));
+                    return topicsPartitionInfos;
+                }
+            }
+
+            timer.sleep(retryBackoffMs);
+        } while (timer.notExpired());
+
+        throw new TimeoutException("Timeout expired while fetching topic metadata");
+    }
+
+    /**
+     * Send Metadata Request to least loaded node in Kafka cluster asynchronously
+     * @return A future that indicates result of sent metadata request
+     */
+    private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest.Builder request) {
+        final Node node = client.leastLoadedNode();
+        if (node == null)
+            return RequestFuture.noBrokersAvailable();
+        else
+            return client.send(node, request);
+    }
+
+    /**
+     * Reset offsets for all assigned partitions that require it.
+     *
+     * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
+     *   and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
+     */
+    public void resetOffsetsIfNeeded() {
+        // Raise exception from previous offset fetch if there is one
+        RuntimeException exception = cachedListOffsetsException.getAndSet(null);
+        if (exception != null)
+            throw exception;
+
+        Set<TopicPartition> partitions = subscriptions.partitionsNeedingReset(time.milliseconds());
+        if (partitions.isEmpty())
+            return;
+
+        final Map<TopicPartition, Long> offsetResetTimestamps = new HashMap<>();
+        for (final TopicPartition partition : partitions) {
+            Long timestamp = offsetResetStrategyTimestamp(partition);
+            if (timestamp != null)
+                offsetResetTimestamps.put(partition, timestamp);
+        }
+
+        resetOffsetsAsync(offsetResetTimestamps);
+    }
+
+    /**
+     * Validate offsets for all assigned partitions for which a leader change has been detected.
+     */
+    public void validateOffsetsIfNeeded() {

Review Comment:
   I had assumed that offsets and positions were synonyms. Is that incorrect?
   
   I'll rename the methods as suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1110451696


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##########
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+/**
+ * {@link OffsetFetcher} is responsible for fetching the {@link OffsetAndTimestamp offsets} for
+ * a given set of {@link TopicPartition topic and partition pairs} and for validation and resetting of positions,
+ * as needed.
+ */
+public class OffsetFetcher {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final ConsumerNetworkClient client;
+    private final Time time;
+    private final long retryBackoffMs;
+    private final long requestTimeoutMs;
+    private final IsolationLevel isolationLevel;
+    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
+    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
+    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+    private final ApiVersions apiVersions;
+    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+    public OffsetFetcher(LogContext logContext,
+                         ConsumerNetworkClient client,
+                         ConsumerMetadata metadata,
+                         SubscriptionState subscriptions,
+                         Time time,
+                         long retryBackoffMs,
+                         long requestTimeoutMs,
+                         IsolationLevel isolationLevel,
+                         ApiVersions apiVersions) {
+        this.log = logContext.logger(getClass());
+        this.time = time;
+        this.client = client;
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.retryBackoffMs = retryBackoffMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.isolationLevel = isolationLevel;
+        this.apiVersions = apiVersions;
+        this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
+    }
+
+    /**
+     * Represents data about an offset returned by a broker.
+     */
+    static class ListOffsetData {
+        final long offset;
+        final Long timestamp; //  null if the broker does not support returning timestamps

Review Comment:
   I agree to doing it separately. Best to keep the refactor as dumb as possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji merged pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji merged PR #13192:
URL: https://github.com/apache/kafka/pull/13192


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1107862255


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java:
##########
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+public class MetadataFetcher {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final ConsumerNetworkClient client;
+    private final Time time;
+    private final long retryBackoffMs;
+    private final long requestTimeoutMs;
+    private final IsolationLevel isolationLevel;
+    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
+    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
+    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+    private final ApiVersions apiVersions;
+    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1107930541


##########
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##########
@@ -1249,7 +1263,7 @@ private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetad
                     //
                     // NOTE: since the consumed position has already been updated, we must not allow

Review Comment:
   This is an old documentation, I wonder if it can be a bit misleading as we don't really have a consumed position anymore. @hachikuji 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1109189751


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##########
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+/**
+ * {@link OffsetFetcher} is responsible for fetching the {@link OffsetAndTimestamp offsets} for
+ * a given set of {@link TopicPartition topic and partition pairs} and for validation and resetting of positions,
+ * as needed.
+ */
+public class OffsetFetcher {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final ConsumerNetworkClient client;
+    private final Time time;
+    private final long retryBackoffMs;
+    private final long requestTimeoutMs;
+    private final IsolationLevel isolationLevel;
+    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
+    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
+    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+    private final ApiVersions apiVersions;
+    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+    public OffsetFetcher(LogContext logContext,
+                         ConsumerNetworkClient client,
+                         ConsumerMetadata metadata,
+                         SubscriptionState subscriptions,
+                         Time time,
+                         long retryBackoffMs,
+                         long requestTimeoutMs,
+                         IsolationLevel isolationLevel,
+                         ApiVersions apiVersions) {
+        this.log = logContext.logger(getClass());
+        this.time = time;
+        this.client = client;
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.retryBackoffMs = retryBackoffMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.isolationLevel = isolationLevel;
+        this.apiVersions = apiVersions;
+        this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
+    }
+
+    /**
+     * Represents data about an offset returned by a broker.
+     */
+    static class ListOffsetData {
+        final long offset;
+        final Long timestamp; //  null if the broker does not support returning timestamps

Review Comment:
   I feel neutral about this. If in general we feel that `Optional` would be preferred than nullable non-primitives, then let's do it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1106467125


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java:
##########
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+public class MetadataFetcher {

Review Comment:
   Some high-level documentation for this class would be useful. "Metadata" is such an overloaded term that we could probably justify sticking anything in here. Would it make sense to call it `OffsetManager` or something like that instead? It seems like most of the logic here is offset bookkeeping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "hachikuji (via GitHub)" <gi...@apache.org>.
hachikuji commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1106464011


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java:
##########
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+public class MetadataFetcher {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final ConsumerNetworkClient client;
+    private final Time time;
+    private final long retryBackoffMs;
+    private final long requestTimeoutMs;
+    private final IsolationLevel isolationLevel;
+    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
+    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
+    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+    private final ApiVersions apiVersions;
+    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);

Review Comment:
   This field is only used in `validatePositionsOnMetadataChange`, which seems to be duplicated in both `Fetcher` and `MetadataFetcher`. I think we can drop the duplicate method in `Fetcher`. Instead, we can add a method like this to `KafkaConsumer`:
   
   ```java
   private int sendFetches() {
     metadataFetcher.validatePositionsOnMetadataChange();
     return fetcher.sendFetches();
   }
   ```
   Does that make sense?
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1110423708


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##########
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+/**
+ * {@link OffsetFetcher} is responsible for fetching the {@link OffsetAndTimestamp offsets} for
+ * a given set of {@link TopicPartition topic and partition pairs} and for validation and resetting of positions,
+ * as needed.
+ */
+public class OffsetFetcher {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final ConsumerNetworkClient client;
+    private final Time time;
+    private final long retryBackoffMs;
+    private final long requestTimeoutMs;
+    private final IsolationLevel isolationLevel;
+    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
+    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
+    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+    private final ApiVersions apiVersions;
+    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+    public OffsetFetcher(LogContext logContext,
+                         ConsumerNetworkClient client,
+                         ConsumerMetadata metadata,
+                         SubscriptionState subscriptions,
+                         Time time,
+                         long retryBackoffMs,
+                         long requestTimeoutMs,
+                         IsolationLevel isolationLevel,
+                         ApiVersions apiVersions) {
+        this.log = logContext.logger(getClass());
+        this.time = time;
+        this.client = client;
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.retryBackoffMs = retryBackoffMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.isolationLevel = isolationLevel;
+        this.apiVersions = apiVersions;
+        this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
+    }
+
+    /**
+     * Represents data about an offset returned by a broker.
+     */
+    static class ListOffsetData {
+        final long offset;
+        final Long timestamp; //  null if the broker does not support returning timestamps

Review Comment:
   I'd like to do that, but I'm reluctant to make any more changes to this PR, in order that it can be approved as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1109086646


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetFetcher.java:
##########
@@ -0,0 +1,717 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+/**
+ * {@link OffsetFetcher} is responsible for fetching the {@link OffsetAndTimestamp offsets} for
+ * a given set of {@link TopicPartition topic and partition pairs} and for validation and resetting of positions,
+ * as needed.
+ */
+public class OffsetFetcher {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final ConsumerNetworkClient client;
+    private final Time time;
+    private final long retryBackoffMs;
+    private final long requestTimeoutMs;
+    private final IsolationLevel isolationLevel;
+    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
+    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
+    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+    private final ApiVersions apiVersions;
+    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+    public OffsetFetcher(LogContext logContext,
+                         ConsumerNetworkClient client,
+                         ConsumerMetadata metadata,
+                         SubscriptionState subscriptions,
+                         Time time,
+                         long retryBackoffMs,
+                         long requestTimeoutMs,
+                         IsolationLevel isolationLevel,
+                         ApiVersions apiVersions) {
+        this.log = logContext.logger(getClass());
+        this.time = time;
+        this.client = client;
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.retryBackoffMs = retryBackoffMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.isolationLevel = isolationLevel;
+        this.apiVersions = apiVersions;
+        this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
+    }
+
+    /**
+     * Represents data about an offset returned by a broker.
+     */
+    static class ListOffsetData {
+        final long offset;
+        final Long timestamp; //  null if the broker does not support returning timestamps

Review Comment:
   @guozhangwang @hachikuji What do you think about introducing `Optional` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1107569600


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java:
##########
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+public class MetadataFetcher {
+
+    private final Logger log;
+    private final ConsumerMetadata metadata;
+    private final SubscriptionState subscriptions;
+    private final ConsumerNetworkClient client;
+    private final Time time;
+    private final long retryBackoffMs;
+    private final long requestTimeoutMs;
+    private final IsolationLevel isolationLevel;
+    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
+    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
+    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+    private final ApiVersions apiVersions;
+    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+    public MetadataFetcher(LogContext logContext,
+                           ConsumerNetworkClient client,
+                           ConsumerMetadata metadata,
+                           SubscriptionState subscriptions,
+                           Time time,
+                           long retryBackoffMs,
+                           long requestTimeoutMs,
+                           IsolationLevel isolationLevel,
+                           ApiVersions apiVersions) {
+        this.log = logContext.logger(getClass());
+        this.time = time;
+        this.client = client;
+        this.metadata = metadata;
+        this.subscriptions = subscriptions;
+        this.retryBackoffMs = retryBackoffMs;
+        this.requestTimeoutMs = requestTimeoutMs;
+        this.isolationLevel = isolationLevel;
+        this.apiVersions = apiVersions;
+        this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
+    }
+
+    /**
+     * Represents data about an offset returned by a broker.
+     */
+    static class ListOffsetData {
+        final long offset;
+        final Long timestamp; //  null if the broker does not support returning timestamps
+        final Optional<Integer> leaderEpoch; // empty if the leader epoch is not known
+
+        ListOffsetData(long offset, Long timestamp, Optional<Integer> leaderEpoch) {
+            this.offset = offset;
+            this.timestamp = timestamp;
+            this.leaderEpoch = leaderEpoch;
+        }
+    }
+
+    private Long offsetResetStrategyTimestamp(final TopicPartition partition) {
+        OffsetResetStrategy strategy = subscriptions.resetStrategy(partition);
+        if (strategy == OffsetResetStrategy.EARLIEST)
+            return ListOffsetsRequest.EARLIEST_TIMESTAMP;
+        else if (strategy == OffsetResetStrategy.LATEST)
+            return ListOffsetsRequest.LATEST_TIMESTAMP;
+        else
+            return null;
+    }
+
+    private OffsetResetStrategy timestampToOffsetResetStrategy(long timestamp) {
+        if (timestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP)
+            return OffsetResetStrategy.EARLIEST;
+        else if (timestamp == ListOffsetsRequest.LATEST_TIMESTAMP)
+            return OffsetResetStrategy.LATEST;
+        else
+            return null;
+    }
+
+    /**
+     * Get topic metadata for all topics in the cluster
+     * @param timer Timer bounding how long this method can block
+     * @return The map of topics with their partition information
+     */
+    public Map<String, List<PartitionInfo>> getAllTopicMetadata(Timer timer) {
+        return getTopicMetadata(MetadataRequest.Builder.allTopics(), timer);
+    }
+
+    /**
+     * Get metadata for all topics present in Kafka cluster
+     *
+     * @param request The MetadataRequest to send
+     * @param timer Timer bounding how long this method can block
+     * @return The map of topics with their partition information
+     */
+    public Map<String, List<PartitionInfo>> getTopicMetadata(MetadataRequest.Builder request, Timer timer) {
+        // Save the round trip if no topics are requested.
+        if (!request.isAllTopics() && request.emptyTopicList())
+            return Collections.emptyMap();
+
+        do {
+            RequestFuture<ClientResponse> future = sendMetadataRequest(request);
+            client.poll(future, timer);
+
+            if (future.failed() && !future.isRetriable())
+                throw future.exception();
+
+            if (future.succeeded()) {
+                MetadataResponse response = (MetadataResponse) future.value().responseBody();
+                Cluster cluster = response.buildCluster();
+
+                Set<String> unauthorizedTopics = cluster.unauthorizedTopics();
+                if (!unauthorizedTopics.isEmpty())
+                    throw new TopicAuthorizationException(unauthorizedTopics);
+
+                boolean shouldRetry = false;
+                Map<String, Errors> errors = response.errors();
+                if (!errors.isEmpty()) {
+                    // if there were errors, we need to check whether they were fatal or whether
+                    // we should just retry
+
+                    log.debug("Topic metadata fetch included errors: {}", errors);
+
+                    for (Map.Entry<String, Errors> errorEntry : errors.entrySet()) {
+                        String topic = errorEntry.getKey();
+                        Errors error = errorEntry.getValue();
+
+                        if (error == Errors.INVALID_TOPIC_EXCEPTION)
+                            throw new InvalidTopicException("Topic '" + topic + "' is invalid");
+                        else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION)
+                            // if a requested topic is unknown, we just continue and let it be absent
+                            // in the returned map
+                            continue;
+                        else if (error.exception() instanceof RetriableException)
+                            shouldRetry = true;
+                        else
+                            throw new KafkaException("Unexpected error fetching metadata for topic " + topic,
+                                    error.exception());
+                    }
+                }
+
+                if (!shouldRetry) {
+                    HashMap<String, List<PartitionInfo>> topicsPartitionInfos = new HashMap<>();
+                    for (String topic : cluster.topics())
+                        topicsPartitionInfos.put(topic, cluster.partitionsForTopic(topic));
+                    return topicsPartitionInfos;
+                }
+            }
+
+            timer.sleep(retryBackoffMs);
+        } while (timer.notExpired());
+
+        throw new TimeoutException("Timeout expired while fetching topic metadata");
+    }
+
+    /**
+     * Send Metadata Request to least loaded node in Kafka cluster asynchronously
+     * @return A future that indicates result of sent metadata request
+     */
+    private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest.Builder request) {
+        final Node node = client.leastLoadedNode();
+        if (node == null)
+            return RequestFuture.noBrokersAvailable();
+        else
+            return client.send(node, request);
+    }
+
+    /**
+     * Reset offsets for all assigned partitions that require it.
+     *
+     * @throws org.apache.kafka.clients.consumer.NoOffsetForPartitionException If no offset reset strategy is defined
+     *   and one or more partitions aren't awaiting a seekToBeginning() or seekToEnd().
+     */
+    public void resetOffsetsIfNeeded() {
+        // Raise exception from previous offset fetch if there is one
+        RuntimeException exception = cachedListOffsetsException.getAndSet(null);
+        if (exception != null)
+            throw exception;
+
+        Set<TopicPartition> partitions = subscriptions.partitionsNeedingReset(time.milliseconds());
+        if (partitions.isEmpty())
+            return;
+
+        final Map<TopicPartition, Long> offsetResetTimestamps = new HashMap<>();
+        for (final TopicPartition partition : partitions) {
+            Long timestamp = offsetResetStrategyTimestamp(partition);
+            if (timestamp != null)
+                offsetResetTimestamps.put(partition, timestamp);
+        }
+
+        resetOffsetsAsync(offsetResetTimestamps);
+    }
+
+    /**
+     * Validate offsets for all assigned partitions for which a leader change has been detected.
+     */
+    public void validateOffsetsIfNeeded() {

Review Comment:
   I'd recommend renaming all functions like `validateOffsetsIfNeeded` / `resetOffsetsIfNeeded` here by replacing `Offset` to `Position`, since 1) we do not only just validate / reset offset value, but the whole position including epoch etc as well, 2) this is also to distinguish from the other `offsets` like the ones from fetch responses as well moving on.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java:
##########
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+public class MetadataFetcher {

Review Comment:
   What about calling it `TopicMetadataFetcher` to distinguish with other metadatas?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org