You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "kirktrue (via GitHub)" <gi...@apache.org> on 2023/02/24 21:12:40 UTC

[GitHub] [kafka] kirktrue opened a new pull request, #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

kirktrue opened a new pull request, #13301:
URL: https://github.com/apache/kafka/pull/13301

   The `Fetcher` class is used internally by the `KafkaConsumer` to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored `Fetcher`.
   
   This task includes refactoring `Fetcher` by extracting out the inner classes into top-level (though still in internal) so that those classes can be referenced by forthcoming refactored fetch logic.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang merged PR #13301:
URL: https://github.com/apache/kafka/pull/13301


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13301:
URL: https://github.com/apache/kafka/pull/13301#discussion_r1122517751


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;

Review Comment:
   All fields are either package `protected` or `private` and are grouped into `mutable` and `final` sections, for a total of four sections.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;
+
+    private int recordsRead;
+    private int bytesRead;
+    private RecordBatch currentBatch;
+    private Record lastRecord;
+    private CloseableIterator<Record> records;
+    long nextFetchOffset;
+    Optional<Integer> lastEpoch;
+    boolean isConsumed = false;
+    private Exception cachedRecordException = null;
+    private boolean corruptLastRecord = false;
+    boolean initialized = false;
+
+    CompletedFetch(LogContext logContext,
+                   SubscriptionState subscriptions,
+                   boolean checkCrcs,
+                   BufferSupplier decompressionBufferSupplier,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   IsolationLevel isolationLevel,
+                   TopicPartition partition,
+                   FetchResponseData.PartitionData partitionData,
+                   FetchResponseMetricAggregator metricAggregator,
+                   Iterator<? extends RecordBatch> batches,
+                   Long fetchOffset,
+                   short responseVersion) {
+        this.log = logContext.logger(CompletedFetch.class);
+        this.subscriptions = subscriptions;
+        this.checkCrcs = checkCrcs;
+        this.decompressionBufferSupplier = decompressionBufferSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+        this.partition = partition;
+        this.partitionData = partitionData;
+        this.metricAggregator = metricAggregator;
+        this.batches = batches;
+        this.nextFetchOffset = fetchOffset;
+        this.responseVersion = responseVersion;
+        this.lastEpoch = Optional.empty();
+        this.abortedProducerIds = new HashSet<>();
+        this.abortedTransactions = abortedTransactions(partitionData);
+    }
+
+    /**
+     * Draining a {@link CompletedFetch} will signal that the data has been consumed and the underlying resources
+     * are closed.
+     *
+     * <p/>
+     *
+     * TODO: Is this the same as close()-ing the CompletedFetch?

Review Comment:
   I've updated the comment, but am not going to introduce any error throwing if it's used after a `drain`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;

Review Comment:
   I added a new method named `recordAggregatedMetrics` that wraps the call to `metricAggregator` so that it is not exposed. I am also calling the method internally.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;
+
+    private int recordsRead;
+    private int bytesRead;
+    private RecordBatch currentBatch;
+    private Record lastRecord;
+    private CloseableIterator<Record> records;
+    long nextFetchOffset;
+    Optional<Integer> lastEpoch;
+    boolean isConsumed = false;
+    private Exception cachedRecordException = null;
+    private boolean corruptLastRecord = false;
+    boolean initialized = false;
+
+    CompletedFetch(LogContext logContext,
+                   SubscriptionState subscriptions,
+                   boolean checkCrcs,
+                   BufferSupplier decompressionBufferSupplier,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   IsolationLevel isolationLevel,
+                   TopicPartition partition,
+                   FetchResponseData.PartitionData partitionData,
+                   FetchResponseMetricAggregator metricAggregator,
+                   Iterator<? extends RecordBatch> batches,
+                   Long fetchOffset,
+                   short responseVersion) {
+        this.log = logContext.logger(CompletedFetch.class);
+        this.subscriptions = subscriptions;
+        this.checkCrcs = checkCrcs;
+        this.decompressionBufferSupplier = decompressionBufferSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+        this.partition = partition;
+        this.partitionData = partitionData;
+        this.metricAggregator = metricAggregator;
+        this.batches = batches;
+        this.nextFetchOffset = fetchOffset;
+        this.responseVersion = responseVersion;
+        this.lastEpoch = Optional.empty();
+        this.abortedProducerIds = new HashSet<>();
+        this.abortedTransactions = abortedTransactions(partitionData);
+    }
+
+    /**
+     * Draining a {@link CompletedFetch} will signal that the data has been consumed and the underlying resources
+     * are closed.
+     *
+     * <p/>
+     *
+     * TODO: Is this the same as close()-ing the CompletedFetch?
+     * TODO: Is the fetch usable after it's consumed? Should there be some kind of error thrown if it's used after
+     *       it's been drained?
+     */
+    void drain() {
+        if (!isConsumed) {
+            maybeCloseRecordStream();
+            cachedRecordException = null;
+            this.isConsumed = true;
+            this.metricAggregator.record(partition, bytesRead, recordsRead);
+
+            // we move the partition to the end if we received some bytes. This way, it's more likely that partitions
+            // for the same topic can remain together (allowing for more efficient serialization).
+            if (bytesRead > 0)
+                subscriptions.movePartitionToEnd(partition);
+        }
+    }
+
+    private void maybeEnsureValid(RecordBatch batch) {
+        if (checkCrcs && currentBatch.magic() >= RecordBatch.MAGIC_VALUE_V2) {

Review Comment:
   Per the previous comment, I'll leave this for now. There are a lot of places that pattern could be adopted, even in just this one file, so I don't want to change it partially, either.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;
+
+    private int recordsRead;
+    private int bytesRead;
+    private RecordBatch currentBatch;
+    private Record lastRecord;
+    private CloseableIterator<Record> records;
+    long nextFetchOffset;
+    Optional<Integer> lastEpoch;
+    boolean isConsumed = false;
+    private Exception cachedRecordException = null;
+    private boolean corruptLastRecord = false;
+    boolean initialized = false;
+
+    CompletedFetch(LogContext logContext,
+                   SubscriptionState subscriptions,
+                   boolean checkCrcs,
+                   BufferSupplier decompressionBufferSupplier,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   IsolationLevel isolationLevel,
+                   TopicPartition partition,
+                   FetchResponseData.PartitionData partitionData,
+                   FetchResponseMetricAggregator metricAggregator,
+                   Iterator<? extends RecordBatch> batches,
+                   Long fetchOffset,
+                   short responseVersion) {
+        this.log = logContext.logger(CompletedFetch.class);
+        this.subscriptions = subscriptions;
+        this.checkCrcs = checkCrcs;
+        this.decompressionBufferSupplier = decompressionBufferSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+        this.partition = partition;
+        this.partitionData = partitionData;
+        this.metricAggregator = metricAggregator;
+        this.batches = batches;
+        this.nextFetchOffset = fetchOffset;
+        this.responseVersion = responseVersion;
+        this.lastEpoch = Optional.empty();
+        this.abortedProducerIds = new HashSet<>();
+        this.abortedTransactions = abortedTransactions(partitionData);
+    }
+
+    /**
+     * Draining a {@link CompletedFetch} will signal that the data has been consumed and the underlying resources
+     * are closed.
+     *
+     * <p/>
+     *
+     * TODO: Is this the same as close()-ing the CompletedFetch?
+     * TODO: Is the fetch usable after it's consumed? Should there be some kind of error thrown if it's used after
+     *       it's been drained?
+     */
+    void drain() {
+        if (!isConsumed) {

Review Comment:
   Agreed. I'd like to change as little as possible at this time, if that's OK.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;

Review Comment:
   Yes, I corrected it to be `requestVersion`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchManagerMetrics} class provides wrapper methods to record lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchManagerMetrics {
+
+    private final Metrics metrics;
+    private final FetcherMetricsRegistry metricsRegistry;
+    private final Sensor bytesFetched;
+    private final Sensor recordsFetched;
+    private final Sensor fetchLatency;
+    private final Sensor recordsFetchLag;
+    private final Sensor recordsFetchLead;
+
+    private int assignmentId = 0;
+    private Set<TopicPartition> assignedPartitions = Collections.emptySet();

Review Comment:
   I'm not totally understanding the concept, sorry. Are you suggesting we do something like this:
   
   ```java
       void maybeUpdateAssignment(SubscriptionState subscription) {
           Set<TopicPartition> partitions = subscription.assignedPartitions();
   
           for (MetricName metricName : metrics.metrics().keySet()) {
               TopicPartition tp = parseTopicPartitionFromMetricName(metricName);
               
               if (!partitions.contains(tp)) {
                   metrics.removeSensor(partitionLagMetricName(tp));
                   metrics.removeSensor(partitionLeadMetricName(tp));
                   metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
               }
           }
   
           for (TopicPartition tp : partitions) {
               MetricName metricName = partitionPreferredReadReplicaMetricName(tp);
               metrics.addMetricIfAbsent(
                       metricName,
                       null,
                       (Gauge<Integer>) (config, now) -> subscription.preferredReadReplica(tp, 0L).orElse(-1)
               );
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on PR #13301:
URL: https://github.com/apache/kafka/pull/13301#issuecomment-1453981950

   Got it, thanks for confirming. The rule of thumb is that for any code potentially in production, we would make sure they have test coverage. So as long as we could make it to cover the test gaps before the 3.5 release, I think we can skip the test in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13301:
URL: https://github.com/apache/kafka/pull/13301#discussion_r1117834119


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;

Review Comment:
   The name here seems a bit misleading, I think it should be `requestVersion` right?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchManagerMetrics} class provides wrapper methods to record lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchManagerMetrics {
+
+    private final Metrics metrics;
+    private final FetcherMetricsRegistry metricsRegistry;
+    private final Sensor bytesFetched;
+    private final Sensor recordsFetched;
+    private final Sensor fetchLatency;
+    private final Sensor recordsFetchLag;
+    private final Sensor recordsFetchLead;
+
+    private int assignmentId = 0;
+    private Set<TopicPartition> assignedPartitions = Collections.emptySet();

Review Comment:
   This is not introduced by this PR, but I wonder if it is better i.e. stricter, to just go over all the metrics matching a topic-partition name matcher against the `newAssignedPartitions` and remove if necessary, than remembering the old assignment and just loop over these, given that `maybeUpdateAssignment` now can be triggered from different callers.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;
+
+    private int recordsRead;
+    private int bytesRead;
+    private RecordBatch currentBatch;
+    private Record lastRecord;
+    private CloseableIterator<Record> records;
+    long nextFetchOffset;
+    Optional<Integer> lastEpoch;
+    boolean isConsumed = false;
+    private Exception cachedRecordException = null;
+    private boolean corruptLastRecord = false;
+    boolean initialized = false;
+
+    CompletedFetch(LogContext logContext,
+                   SubscriptionState subscriptions,
+                   boolean checkCrcs,
+                   BufferSupplier decompressionBufferSupplier,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   IsolationLevel isolationLevel,
+                   TopicPartition partition,
+                   FetchResponseData.PartitionData partitionData,
+                   FetchResponseMetricAggregator metricAggregator,
+                   Iterator<? extends RecordBatch> batches,
+                   Long fetchOffset,
+                   short responseVersion) {
+        this.log = logContext.logger(CompletedFetch.class);
+        this.subscriptions = subscriptions;
+        this.checkCrcs = checkCrcs;
+        this.decompressionBufferSupplier = decompressionBufferSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+        this.partition = partition;
+        this.partitionData = partitionData;
+        this.metricAggregator = metricAggregator;
+        this.batches = batches;
+        this.nextFetchOffset = fetchOffset;
+        this.responseVersion = responseVersion;
+        this.lastEpoch = Optional.empty();
+        this.abortedProducerIds = new HashSet<>();
+        this.abortedTransactions = abortedTransactions(partitionData);
+    }
+
+    /**
+     * Draining a {@link CompletedFetch} will signal that the data has been consumed and the underlying resources
+     * are closed.
+     *
+     * <p/>
+     *
+     * TODO: Is this the same as close()-ing the CompletedFetch?

Review Comment:
   I'd say, `yes` and `not` i.e. it's disposable and not reusable.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;

Review Comment:
   Exposing `metricAggregator` to just call a record when error is not NONE or we do not have valid position feels a bit overkill. What about just add a function to `completedFetch` like `recordErrorResponse`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;

Review Comment:
   `partition` here could be package-private as well?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchManagerMetrics} class provides wrapper methods to record lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchManagerMetrics {
+
+    private final Metrics metrics;
+    private final FetcherMetricsRegistry metricsRegistry;
+    private final Sensor bytesFetched;
+    private final Sensor recordsFetched;
+    private final Sensor fetchLatency;
+    private final Sensor recordsFetchLag;
+    private final Sensor recordsFetchLead;
+
+    private int assignmentId = 0;
+    private Set<TopicPartition> assignedPartitions = Collections.emptySet();

Review Comment:
   Ditto with remembering the `assignmentId`.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;

Review Comment:
   Very nit: could we also group package-private, and mutable fields declarations together?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java:
##########
@@ -768,448 +728,6 @@ public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry
         return fetchThrottleTimeSensor;
     }
 
-    private class CompletedFetch {
-        private final TopicPartition partition;
-        private final Iterator<? extends RecordBatch> batches;
-        private final Set<Long> abortedProducerIds;
-        private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
-        private final FetchResponseData.PartitionData partitionData;
-        private final FetchResponseMetricAggregator metricAggregator;
-        private final short responseVersion;
-
-        private int recordsRead;
-        private int bytesRead;
-        private RecordBatch currentBatch;
-        private Record lastRecord;
-        private CloseableIterator<Record> records;
-        private long nextFetchOffset;
-        private Optional<Integer> lastEpoch;
-        private boolean isConsumed = false;
-        private Exception cachedRecordException = null;
-        private boolean corruptLastRecord = false;
-        private boolean initialized = false;
-
-        private CompletedFetch(TopicPartition partition,
-                               FetchResponseData.PartitionData partitionData,
-                               FetchResponseMetricAggregator metricAggregator,
-                               Iterator<? extends RecordBatch> batches,
-                               Long fetchOffset,
-                               short responseVersion) {
-            this.partition = partition;
-            this.partitionData = partitionData;
-            this.metricAggregator = metricAggregator;
-            this.batches = batches;
-            this.nextFetchOffset = fetchOffset;
-            this.responseVersion = responseVersion;
-            this.lastEpoch = Optional.empty();
-            this.abortedProducerIds = new HashSet<>();
-            this.abortedTransactions = abortedTransactions(partitionData);
-        }
-
-        private void drain() {
-            if (!isConsumed) {
-                maybeCloseRecordStream();
-                cachedRecordException = null;
-                this.isConsumed = true;
-                this.metricAggregator.record(partition, bytesRead, recordsRead);
-
-                // we move the partition to the end if we received some bytes. This way, it's more likely that partitions
-                // for the same topic can remain together (allowing for more efficient serialization).
-                if (bytesRead > 0)
-                    subscriptions.movePartitionToEnd(partition);
-            }
-        }
-
-        private void maybeEnsureValid(RecordBatch batch) {
-            if (checkCrcs && currentBatch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
-                try {
-                    batch.ensureValid();
-                } catch (CorruptRecordException e) {
-                    throw new KafkaException("Record batch for partition " + partition + " at offset " +
-                            batch.baseOffset() + " is invalid, cause: " + e.getMessage());
-                }
-            }
-        }
-
-        private void maybeEnsureValid(Record record) {
-            if (checkCrcs) {
-                try {
-                    record.ensureValid();
-                } catch (CorruptRecordException e) {
-                    throw new KafkaException("Record for partition " + partition + " at offset " + record.offset()
-                            + " is invalid, cause: " + e.getMessage());
-                }
-            }
-        }
-
-        private void maybeCloseRecordStream() {
-            if (records != null) {
-                records.close();
-                records = null;
-            }
-        }
-
-        private Record nextFetchedRecord() {
-            while (true) {
-                if (records == null || !records.hasNext()) {
-                    maybeCloseRecordStream();
-
-                    if (!batches.hasNext()) {
-                        // Message format v2 preserves the last offset in a batch even if the last record is removed
-                        // through compaction. By using the next offset computed from the last offset in the batch,
-                        // we ensure that the offset of the next fetch will point to the next batch, which avoids
-                        // unnecessary re-fetching of the same batch (in the worst case, the consumer could get stuck
-                        // fetching the same batch repeatedly).
-                        if (currentBatch != null)
-                            nextFetchOffset = currentBatch.nextOffset();
-                        drain();
-                        return null;
-                    }
-
-                    currentBatch = batches.next();
-                    lastEpoch = currentBatch.partitionLeaderEpoch() == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
-                            Optional.empty() : Optional.of(currentBatch.partitionLeaderEpoch());
-
-                    maybeEnsureValid(currentBatch);
-
-                    if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
-                        // remove from the aborted transaction queue all aborted transactions which have begun
-                        // before the current batch's last offset and add the associated producerIds to the
-                        // aborted producer set
-                        consumeAbortedTransactionsUpTo(currentBatch.lastOffset());
-
-                        long producerId = currentBatch.producerId();
-                        if (containsAbortMarker(currentBatch)) {
-                            abortedProducerIds.remove(producerId);
-                        } else if (isBatchAborted(currentBatch)) {
-                            log.debug("Skipping aborted record batch from partition {} with producerId {} and " +
-                                          "offsets {} to {}",
-                                      partition, producerId, currentBatch.baseOffset(), currentBatch.lastOffset());
-                            nextFetchOffset = currentBatch.nextOffset();
-                            continue;
-                        }
-                    }
-
-                    records = currentBatch.streamingIterator(decompressionBufferSupplier);
-                } else {
-                    Record record = records.next();
-                    // skip any records out of range
-                    if (record.offset() >= nextFetchOffset) {
-                        // we only do validation when the message should not be skipped.
-                        maybeEnsureValid(record);
-
-                        // control records are not returned to the user
-                        if (!currentBatch.isControlBatch()) {
-                            return record;
-                        } else {
-                            // Increment the next fetch offset when we skip a control batch.
-                            nextFetchOffset = record.offset() + 1;
-                        }
-                    }
-                }
-            }
-        }
-
-        private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
-            // Error when fetching the next record before deserialization.
-            if (corruptLastRecord)
-                throw new KafkaException("Received exception when fetching the next record from " + partition
-                                             + ". If needed, please seek past the record to "
-                                             + "continue consumption.", cachedRecordException);
-
-            if (isConsumed)
-                return Collections.emptyList();
-
-            List<ConsumerRecord<K, V>> records = new ArrayList<>();
-            try {
-                for (int i = 0; i < maxRecords; i++) {
-                    // Only move to next record if there was no exception in the last fetch. Otherwise we should
-                    // use the last record to do deserialization again.
-                    if (cachedRecordException == null) {
-                        corruptLastRecord = true;
-                        lastRecord = nextFetchedRecord();
-                        corruptLastRecord = false;
-                    }
-                    if (lastRecord == null)
-                        break;
-                    records.add(parseRecord(partition, currentBatch, lastRecord));
-                    recordsRead++;
-                    bytesRead += lastRecord.sizeInBytes();
-                    nextFetchOffset = lastRecord.offset() + 1;
-                    // In some cases, the deserialization may have thrown an exception and the retry may succeed,
-                    // we allow user to move forward in this case.
-                    cachedRecordException = null;
-                }
-            } catch (SerializationException se) {
-                cachedRecordException = se;
-                if (records.isEmpty())
-                    throw se;
-            } catch (KafkaException e) {
-                cachedRecordException = e;
-                if (records.isEmpty())
-                    throw new KafkaException("Received exception when fetching the next record from " + partition
-                                                 + ". If needed, please seek past the record to "
-                                                 + "continue consumption.", e);
-            }
-            return records;
-        }
-
-        private void consumeAbortedTransactionsUpTo(long offset) {
-            if (abortedTransactions == null)
-                return;
-
-            while (!abortedTransactions.isEmpty() && abortedTransactions.peek().firstOffset() <= offset) {
-                FetchResponseData.AbortedTransaction abortedTransaction = abortedTransactions.poll();
-                abortedProducerIds.add(abortedTransaction.producerId());
-            }
-        }
-
-        private boolean isBatchAborted(RecordBatch batch) {
-            return batch.isTransactional() && abortedProducerIds.contains(batch.producerId());
-        }
-
-        private PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions(FetchResponseData.PartitionData partition) {
-            if (partition.abortedTransactions() == null || partition.abortedTransactions().isEmpty())
-                return null;
-
-            PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions = new PriorityQueue<>(
-                    partition.abortedTransactions().size(), Comparator.comparingLong(FetchResponseData.AbortedTransaction::firstOffset)
-            );
-            abortedTransactions.addAll(partition.abortedTransactions());
-            return abortedTransactions;
-        }
-
-        private boolean containsAbortMarker(RecordBatch batch) {
-            if (!batch.isControlBatch())
-                return false;
-
-            Iterator<Record> batchIterator = batch.iterator();
-            if (!batchIterator.hasNext())
-                return false;
-
-            Record firstRecord = batchIterator.next();
-            return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
-        }
-
-        private boolean notInitialized() {
-            return !this.initialized;
-        }
-    }
-
-    /**
-     * Since we parse the message data for each partition from each fetch response lazily, fetch-level
-     * metrics need to be aggregated as the messages from each partition are parsed. This class is used
-     * to facilitate this incremental aggregation.
-     */
-    private static class FetchResponseMetricAggregator {
-        private final FetchManagerMetrics sensors;
-        private final Set<TopicPartition> unrecordedPartitions;
-
-        private final FetchMetrics fetchMetrics = new FetchMetrics();
-        private final Map<String, FetchMetrics> topicFetchMetrics = new HashMap<>();
-
-        private FetchResponseMetricAggregator(FetchManagerMetrics sensors,
-                                              Set<TopicPartition> partitions) {
-            this.sensors = sensors;
-            this.unrecordedPartitions = partitions;
-        }
-
-        /**
-         * After each partition is parsed, we update the current metric totals with the total bytes
-         * and number of records parsed. After all partitions have reported, we write the metric.
-         */
-        public void record(TopicPartition partition, int bytes, int records) {
-            this.unrecordedPartitions.remove(partition);
-            this.fetchMetrics.increment(bytes, records);
-
-            // collect and aggregate per-topic metrics
-            String topic = partition.topic();
-            FetchMetrics topicFetchMetric = this.topicFetchMetrics.get(topic);
-            if (topicFetchMetric == null) {
-                topicFetchMetric = new FetchMetrics();
-                this.topicFetchMetrics.put(topic, topicFetchMetric);
-            }
-            topicFetchMetric.increment(bytes, records);
-
-            if (this.unrecordedPartitions.isEmpty()) {
-                // once all expected partitions from the fetch have reported in, record the metrics
-                this.sensors.bytesFetched.record(this.fetchMetrics.fetchBytes);
-                this.sensors.recordsFetched.record(this.fetchMetrics.fetchRecords);
-
-                // also record per-topic metrics
-                for (Map.Entry<String, FetchMetrics> entry: this.topicFetchMetrics.entrySet()) {
-                    FetchMetrics metric = entry.getValue();
-                    this.sensors.recordTopicFetchMetrics(entry.getKey(), metric.fetchBytes, metric.fetchRecords);
-                }
-            }
-        }
-
-        private static class FetchMetrics {
-            private int fetchBytes;
-            private int fetchRecords;
-
-            protected void increment(int bytes, int records) {
-                this.fetchBytes += bytes;
-                this.fetchRecords += records;
-            }
-        }
-    }
-
-    private static class FetchManagerMetrics {

Review Comment:
   I'm assuming there's no code change at all and just move class around, so did not look into it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on PR #13301:
URL: https://github.com/apache/kafka/pull/13301#issuecomment-1462812863

   @guozhangwang I've added some dedicated unit tests. Keep in mind that these classes are all still used by the `Fetcher` and thus the extensive `FetcherTest` class covers them too. When running `FetcherTest` in code coverage mode, well over 95+% of the code is covered by the existing tests already.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13301:
URL: https://github.com/apache/kafka/pull/13301#discussion_r1117866050


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;

Review Comment:
   I think we need it public because it is in the internal package.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on PR #13301:
URL: https://github.com/apache/kafka/pull/13301#issuecomment-1444580426

   @hachikuji @rajinisivaram @guozhangwang @vvcephei @philipnee This is another PR that "simply" refactors out the inner classes from the `Fetcher`. I broke it out into a separate PR because it's very noisy for the forthcoming PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] philipnee commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

Posted by "philipnee (via GitHub)" <gi...@apache.org>.
philipnee commented on code in PR #13301:
URL: https://github.com/apache/kafka/pull/13301#discussion_r1117808261


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;
+
+    private int recordsRead;
+    private int bytesRead;
+    private RecordBatch currentBatch;
+    private Record lastRecord;
+    private CloseableIterator<Record> records;
+    long nextFetchOffset;
+    Optional<Integer> lastEpoch;
+    boolean isConsumed = false;
+    private Exception cachedRecordException = null;
+    private boolean corruptLastRecord = false;
+    boolean initialized = false;
+
+    CompletedFetch(LogContext logContext,
+                   SubscriptionState subscriptions,
+                   boolean checkCrcs,
+                   BufferSupplier decompressionBufferSupplier,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   IsolationLevel isolationLevel,
+                   TopicPartition partition,
+                   FetchResponseData.PartitionData partitionData,
+                   FetchResponseMetricAggregator metricAggregator,
+                   Iterator<? extends RecordBatch> batches,
+                   Long fetchOffset,
+                   short responseVersion) {
+        this.log = logContext.logger(CompletedFetch.class);
+        this.subscriptions = subscriptions;
+        this.checkCrcs = checkCrcs;
+        this.decompressionBufferSupplier = decompressionBufferSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+        this.partition = partition;
+        this.partitionData = partitionData;
+        this.metricAggregator = metricAggregator;
+        this.batches = batches;
+        this.nextFetchOffset = fetchOffset;
+        this.responseVersion = responseVersion;
+        this.lastEpoch = Optional.empty();
+        this.abortedProducerIds = new HashSet<>();
+        this.abortedTransactions = abortedTransactions(partitionData);
+    }
+
+    /**
+     * Draining a {@link CompletedFetch} will signal that the data has been consumed and the underlying resources
+     * are closed.
+     *
+     * <p/>
+     *
+     * TODO: Is this the same as close()-ing the CompletedFetch?
+     * TODO: Is the fetch usable after it's consumed? Should there be some kind of error thrown if it's used after
+     *       it's been drained?
+     */
+    void drain() {
+        if (!isConsumed) {
+            maybeCloseRecordStream();
+            cachedRecordException = null;
+            this.isConsumed = true;
+            this.metricAggregator.record(partition, bytesRead, recordsRead);
+
+            // we move the partition to the end if we received some bytes. This way, it's more likely that partitions
+            // for the same topic can remain together (allowing for more efficient serialization).
+            if (bytesRead > 0)
+                subscriptions.movePartitionToEnd(partition);
+        }
+    }
+
+    private void maybeEnsureValid(RecordBatch batch) {
+        if (checkCrcs && currentBatch.magic() >= RecordBatch.MAGIC_VALUE_V2) {

Review Comment:
   ditto, it would be better if we could return early



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchManagerMetrics} class provides wrapper methods to record lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchManagerMetrics {

Review Comment:
   Can we keep a list of metrics that we measure here? Or a link to a webpage.  I feel these metrics are essential for observability purposes but aren't very well explained/documented.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;
+
+    private int recordsRead;
+    private int bytesRead;
+    private RecordBatch currentBatch;
+    private Record lastRecord;
+    private CloseableIterator<Record> records;
+    long nextFetchOffset;
+    Optional<Integer> lastEpoch;
+    boolean isConsumed = false;
+    private Exception cachedRecordException = null;
+    private boolean corruptLastRecord = false;
+    boolean initialized = false;
+
+    CompletedFetch(LogContext logContext,
+                   SubscriptionState subscriptions,
+                   boolean checkCrcs,
+                   BufferSupplier decompressionBufferSupplier,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   IsolationLevel isolationLevel,
+                   TopicPartition partition,
+                   FetchResponseData.PartitionData partitionData,
+                   FetchResponseMetricAggregator metricAggregator,
+                   Iterator<? extends RecordBatch> batches,
+                   Long fetchOffset,
+                   short responseVersion) {
+        this.log = logContext.logger(CompletedFetch.class);
+        this.subscriptions = subscriptions;
+        this.checkCrcs = checkCrcs;
+        this.decompressionBufferSupplier = decompressionBufferSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+        this.partition = partition;
+        this.partitionData = partitionData;
+        this.metricAggregator = metricAggregator;
+        this.batches = batches;
+        this.nextFetchOffset = fetchOffset;
+        this.responseVersion = responseVersion;
+        this.lastEpoch = Optional.empty();
+        this.abortedProducerIds = new HashSet<>();
+        this.abortedTransactions = abortedTransactions(partitionData);
+    }
+
+    /**
+     * Draining a {@link CompletedFetch} will signal that the data has been consumed and the underlying resources
+     * are closed.
+     *
+     * <p/>
+     *
+     * TODO: Is this the same as close()-ing the CompletedFetch?
+     * TODO: Is the fetch usable after it's consumed? Should there be some kind of error thrown if it's used after
+     *       it's been drained?
+     */
+    void drain() {
+        if (!isConsumed) {

Review Comment:
   IIRC, this is the original code, but it annoys me a bit. It would be more readable if we do:
   if (isConsumed) return;
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13301:
URL: https://github.com/apache/kafka/pull/13301#discussion_r1132719577


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchManagerMetrics} class provides wrapper methods to record lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchManagerMetrics {
+
+    private final Metrics metrics;
+    private final FetcherMetricsRegistry metricsRegistry;
+    private final Sensor bytesFetched;
+    private final Sensor recordsFetched;
+    private final Sensor fetchLatency;
+    private final Sensor recordsFetchLag;
+    private final Sensor recordsFetchLead;
+
+    private int assignmentId = 0;
+    private Set<TopicPartition> assignedPartitions = Collections.emptySet();

Review Comment:
   Anyways, we can defer this change for future changes since it's not introduced by this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on code in PR #13301:
URL: https://github.com/apache/kafka/pull/13301#discussion_r1122535670


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchManagerMetrics} class provides wrapper methods to record lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchManagerMetrics {

Review Comment:
   These should come from here: https://kafka.apache.org/documentation/#consumer_fetch_monitoring



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

Posted by "guozhangwang (via GitHub)" <gi...@apache.org>.
guozhangwang commented on code in PR #13301:
URL: https://github.com/apache/kafka/pull/13301#discussion_r1123543033


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchManagerMetrics} class provides wrapper methods to record lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchManagerMetrics {
+
+    private final Metrics metrics;
+    private final FetcherMetricsRegistry metricsRegistry;
+    private final Sensor bytesFetched;
+    private final Sensor recordsFetched;
+    private final Sensor fetchLatency;
+    private final Sensor recordsFetchLag;
+    private final Sensor recordsFetchLead;
+
+    private int assignmentId = 0;
+    private Set<TopicPartition> assignedPartitions = Collections.emptySet();

Review Comment:
   Yeah something like that (though we need to check if all metric names did include the partition in the same format), instead of trying to remember all the prev assigned partitions instead. The idea is that since now the assignment could be updated by different threads, we may lose track of the full history of `assigned partitions` and hence would cause some dangling metrics in the registry never to be removed, or some metrics unnecessarily removed, by just relying on the new assignment and always loop over all metrics to trim them seems safer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] kirktrue commented on pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

Posted by "kirktrue (via GitHub)" <gi...@apache.org>.
kirktrue commented on PR #13301:
URL: https://github.com/apache/kafka/pull/13301#issuecomment-1452775886

   @guozhangwang wrote:
   
   > A meta comment is that for the extracted classes, I think we should add unit test coverages for them (hopefully most of them are already in the `FetcherTest`...) i.e. one test class for each of the three.
   
   Because `CompletedFetch`, `FetchManagerMetrics`, and `FetchResponseMetricAggregator` were all `private` inner classes of `Fetcher`, they are not directly referenced in `FetcherTest`. As such, there aren't any tests in `FetcherTest` that I can extract to make `CompletedFetchTest`, `FetchManagerMetricsTest`, or `FetchResponseMetricAggregatorTest` test classes. Any new unit test classes would be net new code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org