You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "philipnee (via GitHub)" <gi...@apache.org> on 2023/02/25 04:43:53 UTC

[GitHub] [kafka] philipnee commented on a diff in pull request #13301: KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring

philipnee commented on code in PR #13301:
URL: https://github.com/apache/kafka/pull/13301#discussion_r1117808261


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;
+
+    private int recordsRead;
+    private int bytesRead;
+    private RecordBatch currentBatch;
+    private Record lastRecord;
+    private CloseableIterator<Record> records;
+    long nextFetchOffset;
+    Optional<Integer> lastEpoch;
+    boolean isConsumed = false;
+    private Exception cachedRecordException = null;
+    private boolean corruptLastRecord = false;
+    boolean initialized = false;
+
+    CompletedFetch(LogContext logContext,
+                   SubscriptionState subscriptions,
+                   boolean checkCrcs,
+                   BufferSupplier decompressionBufferSupplier,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   IsolationLevel isolationLevel,
+                   TopicPartition partition,
+                   FetchResponseData.PartitionData partitionData,
+                   FetchResponseMetricAggregator metricAggregator,
+                   Iterator<? extends RecordBatch> batches,
+                   Long fetchOffset,
+                   short responseVersion) {
+        this.log = logContext.logger(CompletedFetch.class);
+        this.subscriptions = subscriptions;
+        this.checkCrcs = checkCrcs;
+        this.decompressionBufferSupplier = decompressionBufferSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+        this.partition = partition;
+        this.partitionData = partitionData;
+        this.metricAggregator = metricAggregator;
+        this.batches = batches;
+        this.nextFetchOffset = fetchOffset;
+        this.responseVersion = responseVersion;
+        this.lastEpoch = Optional.empty();
+        this.abortedProducerIds = new HashSet<>();
+        this.abortedTransactions = abortedTransactions(partitionData);
+    }
+
+    /**
+     * Draining a {@link CompletedFetch} will signal that the data has been consumed and the underlying resources
+     * are closed.
+     *
+     * <p/>
+     *
+     * TODO: Is this the same as close()-ing the CompletedFetch?
+     * TODO: Is the fetch usable after it's consumed? Should there be some kind of error thrown if it's used after
+     *       it's been drained?
+     */
+    void drain() {
+        if (!isConsumed) {
+            maybeCloseRecordStream();
+            cachedRecordException = null;
+            this.isConsumed = true;
+            this.metricAggregator.record(partition, bytesRead, recordsRead);
+
+            // we move the partition to the end if we received some bytes. This way, it's more likely that partitions
+            // for the same topic can remain together (allowing for more efficient serialization).
+            if (bytesRead > 0)
+                subscriptions.movePartitionToEnd(partition);
+        }
+    }
+
+    private void maybeEnsureValid(RecordBatch batch) {
+        if (checkCrcs && currentBatch.magic() >= RecordBatch.MAGIC_VALUE_V2) {

Review Comment:
   ditto, it would be better if we could return early



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchManagerMetrics.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchManagerMetrics} class provides wrapper methods to record lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchManagerMetrics {

Review Comment:
   Can we keep a list of metrics that we measure here? Or a link to a webpage.  I feel these metrics are essential for observability purposes but aren't very well explained/documented.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java:
##########
@@ -0,0 +1,356 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    public final TopicPartition partition;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
+    final FetchResponseData.PartitionData partitionData;
+    final FetchResponseMetricAggregator metricAggregator;
+    final short responseVersion;
+
+    private int recordsRead;
+    private int bytesRead;
+    private RecordBatch currentBatch;
+    private Record lastRecord;
+    private CloseableIterator<Record> records;
+    long nextFetchOffset;
+    Optional<Integer> lastEpoch;
+    boolean isConsumed = false;
+    private Exception cachedRecordException = null;
+    private boolean corruptLastRecord = false;
+    boolean initialized = false;
+
+    CompletedFetch(LogContext logContext,
+                   SubscriptionState subscriptions,
+                   boolean checkCrcs,
+                   BufferSupplier decompressionBufferSupplier,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   IsolationLevel isolationLevel,
+                   TopicPartition partition,
+                   FetchResponseData.PartitionData partitionData,
+                   FetchResponseMetricAggregator metricAggregator,
+                   Iterator<? extends RecordBatch> batches,
+                   Long fetchOffset,
+                   short responseVersion) {
+        this.log = logContext.logger(CompletedFetch.class);
+        this.subscriptions = subscriptions;
+        this.checkCrcs = checkCrcs;
+        this.decompressionBufferSupplier = decompressionBufferSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+        this.partition = partition;
+        this.partitionData = partitionData;
+        this.metricAggregator = metricAggregator;
+        this.batches = batches;
+        this.nextFetchOffset = fetchOffset;
+        this.responseVersion = responseVersion;
+        this.lastEpoch = Optional.empty();
+        this.abortedProducerIds = new HashSet<>();
+        this.abortedTransactions = abortedTransactions(partitionData);
+    }
+
+    /**
+     * Draining a {@link CompletedFetch} will signal that the data has been consumed and the underlying resources
+     * are closed.
+     *
+     * <p/>
+     *
+     * TODO: Is this the same as close()-ing the CompletedFetch?
+     * TODO: Is the fetch usable after it's consumed? Should there be some kind of error thrown if it's used after
+     *       it's been drained?
+     */
+    void drain() {
+        if (!isConsumed) {

Review Comment:
   IIRC, this is the original code, but it annoys me a bit. It would be more readable if we do:
   if (isConsumed) return;
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org