You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/23 14:14:53 UTC

[GitHub] [kafka] ijuma commented on a change in pull request #9758: MINOR: remove FetchResponse.AbortedTransaction and redundant construc…

ijuma commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r581044552



##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -290,7 +291,7 @@ public void onSuccess(ClientResponse resp) {
                             Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
                             FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
 
-                            for (Map.Entry<TopicPartition, FetchResponse.PartitionData<Records>> entry : response.responseData().entrySet()) {
+                            for (Map.Entry<TopicPartition, FetchResponseData.FetchablePartitionResponse> entry : response.responseData().entrySet()) {

Review comment:
       `FetchablePartitionResponse` is a bit long and redundant. Could we find a shorter name for it now that `PartitionData` is gone?

##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetchsession/FetchSessionBenchmark.java
##########
@@ -70,24 +70,26 @@ public void setUp() {
         handler = new FetchSessionHandler(LOG_CONTEXT, 1);
         FetchSessionHandler.Builder builder = handler.newBuilder();
 
-        LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> respMap = new LinkedHashMap<>();
+        LinkedHashMap<TopicPartition, FetchResponseData.FetchablePartitionResponse> respMap = new LinkedHashMap<>();
         for (int i = 0; i < partitionCount; i++) {
             TopicPartition tp = new TopicPartition("foo", i);
             FetchRequest.PartitionData partitionData = new FetchRequest.PartitionData(0, 0, 200,
                     Optional.empty());
             fetches.put(tp, partitionData);
             builder.add(tp, partitionData);
-            respMap.put(tp, new FetchResponse.PartitionData<>(
-                    Errors.NONE,
-                    0L,
-                    0L,
-                    0,
-                    null,
-                    null));
+            respMap.put(tp, new FetchResponseData.FetchablePartitionResponse()
+                            .setPartition(tp.partition())
+                            .setErrorCode(Errors.NONE.code())
+                            .setHighWatermark(0)
+                            .setLastStableOffset(0)
+                            .setLogStartOffset(0)
+                            .setAbortedTransactions(null)
+                            .setRecords(null)
+                            .setPreferredReadReplica(FetchResponse.INVALID_PREFERRED_REPLICA_ID));

Review comment:
       Maybe we can remove the defaults from this and every other place where we build `FetchablePartitionResponse`

##########
File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##########
@@ -1257,7 +1259,7 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc
 
                 log.trace("Preparing to read {} bytes of data for partition {} with offset {}",
                         partition.records().sizeInBytes(), tp, position);
-                Iterator<? extends RecordBatch> batches = partition.records().batches().iterator();
+                Iterator<? extends RecordBatch> batches = ((Records) partition.records()).batches().iterator();

Review comment:
       Could we encapsulate this cast in a utility method?

##########
File path: core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
##########
@@ -110,7 +113,7 @@ class ReplicaAlterLogDirsThread(name: String,
   // process fetched data
   override def processPartitionData(topicPartition: TopicPartition,
                                     fetchOffset: Long,
-                                    partitionData: PartitionData[Records]): Option[LogAppendInfo] = {
+                                    partitionData: FetchResponseData.FetchablePartitionResponse): Option[LogAppendInfo] = {

Review comment:
       Could this be `FetchData` too? Are there other places like it?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -57,238 +55,51 @@
  *     the fetch offset after the index lookup
  * - {@link Errors#UNKNOWN_SERVER_ERROR} For any unexpected errors
  */
-public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
-
-    public static final long INVALID_HIGHWATERMARK = -1L;
+public class FetchResponse extends AbstractResponse {
+    public static final long INVALID_HIGH_WATERMARK = -1L;
     public static final long INVALID_LAST_STABLE_OFFSET = -1L;
     public static final long INVALID_LOG_START_OFFSET = -1L;
     public static final int INVALID_PREFERRED_REPLICA_ID = -1;
 
     private final FetchResponseData data;
-    private final LinkedHashMap<TopicPartition, PartitionData<T>> responseDataMap;
+    private final LinkedHashMap<TopicPartition, FetchResponseData.FetchablePartitionResponse> responseData;
 
     @Override
     public FetchResponseData data() {
         return data;
     }
 
-    public static final class AbortedTransaction {
-        public final long producerId;
-        public final long firstOffset;
-
-        public AbortedTransaction(long producerId, long firstOffset) {
-            this.producerId = producerId;
-            this.firstOffset = firstOffset;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            AbortedTransaction that = (AbortedTransaction) o;
-
-            return producerId == that.producerId && firstOffset == that.firstOffset;
-        }
-
-        @Override
-        public int hashCode() {
-            int result = Long.hashCode(producerId);
-            result = 31 * result + Long.hashCode(firstOffset);
-            return result;
-        }
-
-        @Override
-        public String toString() {
-            return "(producerId=" + producerId + ", firstOffset=" + firstOffset + ")";
-        }
-
-        static AbortedTransaction fromMessage(FetchResponseData.AbortedTransaction abortedTransaction) {
-            return new AbortedTransaction(abortedTransaction.producerId(), abortedTransaction.firstOffset());
-        }
-    }
-
-    public static final class PartitionData<T extends BaseRecords> {
-        private final FetchResponseData.FetchablePartitionResponse partitionResponse;
-
-        // Derived fields
-        private final Optional<Integer> preferredReplica;
-        private final List<AbortedTransaction> abortedTransactions;
-        private final Errors error;
-
-        private PartitionData(FetchResponseData.FetchablePartitionResponse partitionResponse) {
-            // We partially construct FetchablePartitionResponse since we don't know the partition ID at this point
-            // When we convert the PartitionData (and other fields) into FetchResponseData down in toMessage, we
-            // set the partition IDs.
-            this.partitionResponse = partitionResponse;
-            this.preferredReplica = Optional.of(partitionResponse.preferredReadReplica())
-                .filter(replicaId -> replicaId != INVALID_PREFERRED_REPLICA_ID);
-
-            if (partitionResponse.abortedTransactions() == null) {
-                this.abortedTransactions = null;
-            } else {
-                this.abortedTransactions = partitionResponse.abortedTransactions().stream()
-                    .map(AbortedTransaction::fromMessage)
-                    .collect(Collectors.toList());
-            }
-
-            this.error = Errors.forCode(partitionResponse.errorCode());
-        }
-
-        public PartitionData(Errors error,
-                             long highWatermark,
-                             long lastStableOffset,
-                             long logStartOffset,
-                             Optional<Integer> preferredReadReplica,
-                             List<AbortedTransaction> abortedTransactions,
-                             Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
-                             T records) {
-            this.preferredReplica = preferredReadReplica;
-            this.abortedTransactions = abortedTransactions;
-            this.error = error;
-
-            FetchResponseData.FetchablePartitionResponse partitionResponse =
-                new FetchResponseData.FetchablePartitionResponse();
-            partitionResponse.setErrorCode(error.code())
-                .setHighWatermark(highWatermark)
-                .setLastStableOffset(lastStableOffset)
-                .setLogStartOffset(logStartOffset);
-            if (abortedTransactions != null) {
-                partitionResponse.setAbortedTransactions(abortedTransactions.stream().map(
-                    aborted -> new FetchResponseData.AbortedTransaction()
-                        .setProducerId(aborted.producerId)
-                        .setFirstOffset(aborted.firstOffset))
-                    .collect(Collectors.toList()));
-            } else {
-                partitionResponse.setAbortedTransactions(null);
-            }
-            partitionResponse.setPreferredReadReplica(preferredReadReplica.orElse(INVALID_PREFERRED_REPLICA_ID));
-            partitionResponse.setRecordSet(records);
-            divergingEpoch.ifPresent(partitionResponse::setDivergingEpoch);
-
-            this.partitionResponse = partitionResponse;
-        }
-
-        public PartitionData(Errors error,
-                             long highWatermark,
-                             long lastStableOffset,
-                             long logStartOffset,
-                             Optional<Integer> preferredReadReplica,
-                             List<AbortedTransaction> abortedTransactions,
-                             T records) {
-            this(error, highWatermark, lastStableOffset, logStartOffset, preferredReadReplica,
-                abortedTransactions, Optional.empty(), records);
-        }
-
-        public PartitionData(Errors error,
-                             long highWatermark,
-                             long lastStableOffset,
-                             long logStartOffset,
-                             List<AbortedTransaction> abortedTransactions,
-                             T records) {
-            this(error, highWatermark, lastStableOffset, logStartOffset, Optional.empty(), abortedTransactions, records);
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            PartitionData that = (PartitionData) o;
-
-            return this.partitionResponse.equals(that.partitionResponse);
-        }
-
-        @Override
-        public int hashCode() {
-            return this.partitionResponse.hashCode();
-        }
-
-        @Override
-        public String toString() {
-            return "(error=" + error() +
-                    ", highWaterMark=" + highWatermark() +
-                    ", lastStableOffset = " + lastStableOffset() +
-                    ", logStartOffset = " + logStartOffset() +
-                    ", preferredReadReplica = " + preferredReadReplica().map(Object::toString).orElse("absent") +
-                    ", abortedTransactions = " + abortedTransactions() +
-                    ", divergingEpoch =" + divergingEpoch() +
-                    ", recordsSizeInBytes=" + records().sizeInBytes() + ")";
-        }
-
-        public Errors error() {
-            return error;
-        }
-
-        public long highWatermark() {
-            return partitionResponse.highWatermark();
-        }
-
-        public long lastStableOffset() {
-            return partitionResponse.lastStableOffset();
-        }
-
-        public long logStartOffset() {
-            return partitionResponse.logStartOffset();
-        }
-
-        public Optional<Integer> preferredReadReplica() {
-            return preferredReplica;
-        }
-
-        public List<AbortedTransaction> abortedTransactions() {
-            return abortedTransactions;
-        }
-
-        public Optional<FetchResponseData.EpochEndOffset> divergingEpoch() {
-            FetchResponseData.EpochEndOffset epochEndOffset = partitionResponse.divergingEpoch();
-            if (epochEndOffset.epoch() < 0) {
-                return Optional.empty();
-            } else {
-                return Optional.of(epochEndOffset);
-            }
-        }
-
-        @SuppressWarnings("unchecked")
-        public T records() {
-            return (T) partitionResponse.recordSet();
-        }
-    }
-
-    /**
-     * From version 3 or later, the entries in `responseData` should be in the same order as the entries in
-     * `FetchRequest.fetchData`.
-     *
-     * @param error             The top-level error code.
-     * @param responseData      The fetched data grouped by partition.
-     * @param throttleTimeMs    The time in milliseconds that the response was throttled
-     * @param sessionId         The fetch session id.
-     */
     public FetchResponse(Errors error,

Review comment:
       I think it would be better to make this a static factory method and keep the constructor for the case where we receive `FetchResponseData`.

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -127,7 +124,7 @@ class LogOffsetTest extends BaseRequestTest {
       Map(topicPartition -> new FetchRequest.PartitionData(consumerOffsets.head, FetchRequest.INVALID_LOG_START_OFFSET,
         300 * 1024, Optional.empty())).asJava).build()
     val fetchResponse = sendFetchRequest(fetchRequest)
-    assertFalse(fetchResponse.responseData.get(topicPartition).records.batches.iterator.hasNext)
+    assertFalse(fetchResponse.responseData.get(topicPartition).records.asInstanceOf[Records].batches.iterator.hasNext)

Review comment:
       Similar to elsewhere, it would be useful to have a utility method to avoid unsafe operations all over the code.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -57,238 +55,51 @@
  *     the fetch offset after the index lookup
  * - {@link Errors#UNKNOWN_SERVER_ERROR} For any unexpected errors
  */
-public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
-
-    public static final long INVALID_HIGHWATERMARK = -1L;
+public class FetchResponse extends AbstractResponse {
+    public static final long INVALID_HIGH_WATERMARK = -1L;
     public static final long INVALID_LAST_STABLE_OFFSET = -1L;
     public static final long INVALID_LOG_START_OFFSET = -1L;
     public static final int INVALID_PREFERRED_REPLICA_ID = -1;
 
     private final FetchResponseData data;
-    private final LinkedHashMap<TopicPartition, PartitionData<T>> responseDataMap;
+    private final LinkedHashMap<TopicPartition, FetchResponseData.FetchablePartitionResponse> responseData;
 
     @Override
     public FetchResponseData data() {
         return data;
     }
 
-    public static final class AbortedTransaction {
-        public final long producerId;
-        public final long firstOffset;
-
-        public AbortedTransaction(long producerId, long firstOffset) {
-            this.producerId = producerId;
-            this.firstOffset = firstOffset;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            AbortedTransaction that = (AbortedTransaction) o;
-
-            return producerId == that.producerId && firstOffset == that.firstOffset;
-        }
-
-        @Override
-        public int hashCode() {
-            int result = Long.hashCode(producerId);
-            result = 31 * result + Long.hashCode(firstOffset);
-            return result;
-        }
-
-        @Override
-        public String toString() {
-            return "(producerId=" + producerId + ", firstOffset=" + firstOffset + ")";
-        }
-
-        static AbortedTransaction fromMessage(FetchResponseData.AbortedTransaction abortedTransaction) {
-            return new AbortedTransaction(abortedTransaction.producerId(), abortedTransaction.firstOffset());
-        }
-    }
-
-    public static final class PartitionData<T extends BaseRecords> {
-        private final FetchResponseData.FetchablePartitionResponse partitionResponse;
-
-        // Derived fields
-        private final Optional<Integer> preferredReplica;
-        private final List<AbortedTransaction> abortedTransactions;
-        private final Errors error;
-
-        private PartitionData(FetchResponseData.FetchablePartitionResponse partitionResponse) {
-            // We partially construct FetchablePartitionResponse since we don't know the partition ID at this point
-            // When we convert the PartitionData (and other fields) into FetchResponseData down in toMessage, we
-            // set the partition IDs.
-            this.partitionResponse = partitionResponse;
-            this.preferredReplica = Optional.of(partitionResponse.preferredReadReplica())
-                .filter(replicaId -> replicaId != INVALID_PREFERRED_REPLICA_ID);
-
-            if (partitionResponse.abortedTransactions() == null) {
-                this.abortedTransactions = null;
-            } else {
-                this.abortedTransactions = partitionResponse.abortedTransactions().stream()
-                    .map(AbortedTransaction::fromMessage)
-                    .collect(Collectors.toList());
-            }
-
-            this.error = Errors.forCode(partitionResponse.errorCode());
-        }
-
-        public PartitionData(Errors error,
-                             long highWatermark,
-                             long lastStableOffset,
-                             long logStartOffset,
-                             Optional<Integer> preferredReadReplica,
-                             List<AbortedTransaction> abortedTransactions,
-                             Optional<FetchResponseData.EpochEndOffset> divergingEpoch,
-                             T records) {
-            this.preferredReplica = preferredReadReplica;
-            this.abortedTransactions = abortedTransactions;
-            this.error = error;
-
-            FetchResponseData.FetchablePartitionResponse partitionResponse =
-                new FetchResponseData.FetchablePartitionResponse();
-            partitionResponse.setErrorCode(error.code())
-                .setHighWatermark(highWatermark)
-                .setLastStableOffset(lastStableOffset)
-                .setLogStartOffset(logStartOffset);
-            if (abortedTransactions != null) {
-                partitionResponse.setAbortedTransactions(abortedTransactions.stream().map(
-                    aborted -> new FetchResponseData.AbortedTransaction()
-                        .setProducerId(aborted.producerId)
-                        .setFirstOffset(aborted.firstOffset))
-                    .collect(Collectors.toList()));
-            } else {
-                partitionResponse.setAbortedTransactions(null);
-            }
-            partitionResponse.setPreferredReadReplica(preferredReadReplica.orElse(INVALID_PREFERRED_REPLICA_ID));
-            partitionResponse.setRecordSet(records);
-            divergingEpoch.ifPresent(partitionResponse::setDivergingEpoch);
-
-            this.partitionResponse = partitionResponse;
-        }
-
-        public PartitionData(Errors error,
-                             long highWatermark,
-                             long lastStableOffset,
-                             long logStartOffset,
-                             Optional<Integer> preferredReadReplica,
-                             List<AbortedTransaction> abortedTransactions,
-                             T records) {
-            this(error, highWatermark, lastStableOffset, logStartOffset, preferredReadReplica,
-                abortedTransactions, Optional.empty(), records);
-        }
-
-        public PartitionData(Errors error,
-                             long highWatermark,
-                             long lastStableOffset,
-                             long logStartOffset,
-                             List<AbortedTransaction> abortedTransactions,
-                             T records) {
-            this(error, highWatermark, lastStableOffset, logStartOffset, Optional.empty(), abortedTransactions, records);
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            PartitionData that = (PartitionData) o;
-
-            return this.partitionResponse.equals(that.partitionResponse);
-        }
-
-        @Override
-        public int hashCode() {
-            return this.partitionResponse.hashCode();
-        }
-
-        @Override
-        public String toString() {
-            return "(error=" + error() +
-                    ", highWaterMark=" + highWatermark() +
-                    ", lastStableOffset = " + lastStableOffset() +
-                    ", logStartOffset = " + logStartOffset() +
-                    ", preferredReadReplica = " + preferredReadReplica().map(Object::toString).orElse("absent") +
-                    ", abortedTransactions = " + abortedTransactions() +
-                    ", divergingEpoch =" + divergingEpoch() +
-                    ", recordsSizeInBytes=" + records().sizeInBytes() + ")";
-        }
-
-        public Errors error() {
-            return error;
-        }
-
-        public long highWatermark() {
-            return partitionResponse.highWatermark();
-        }
-
-        public long lastStableOffset() {
-            return partitionResponse.lastStableOffset();
-        }
-
-        public long logStartOffset() {
-            return partitionResponse.logStartOffset();
-        }
-
-        public Optional<Integer> preferredReadReplica() {
-            return preferredReplica;
-        }
-
-        public List<AbortedTransaction> abortedTransactions() {
-            return abortedTransactions;
-        }
-
-        public Optional<FetchResponseData.EpochEndOffset> divergingEpoch() {
-            FetchResponseData.EpochEndOffset epochEndOffset = partitionResponse.divergingEpoch();
-            if (epochEndOffset.epoch() < 0) {
-                return Optional.empty();
-            } else {
-                return Optional.of(epochEndOffset);
-            }
-        }
-
-        @SuppressWarnings("unchecked")
-        public T records() {
-            return (T) partitionResponse.recordSet();
-        }
-    }
-
-    /**
-     * From version 3 or later, the entries in `responseData` should be in the same order as the entries in
-     * `FetchRequest.fetchData`.
-     *
-     * @param error             The top-level error code.
-     * @param responseData      The fetched data grouped by partition.
-     * @param throttleTimeMs    The time in milliseconds that the response was throttled
-     * @param sessionId         The fetch session id.
-     */
     public FetchResponse(Errors error,
-                         LinkedHashMap<TopicPartition, PartitionData<T>> responseData,
                          int throttleTimeMs,
-                         int sessionId) {
-        super(ApiKeys.FETCH);
-        this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId);
-        this.responseDataMap = responseData;
+                         int sessionId,
+                         LinkedHashMap<TopicPartition, FetchResponseData.FetchablePartitionResponse> responseData) {
+        this(new FetchResponseData()
+            .setSessionId(sessionId)
+            .setErrorCode(error.code())
+            .setThrottleTimeMs(throttleTimeMs)
+            .setResponses(responseData.entrySet().stream().map(entry -> new FetchResponseData.FetchableTopicResponse()
+                .setTopic(entry.getKey().topic())
+                .setPartitionResponses(Collections.singletonList(entry.getValue().setPartition(entry.getKey().partition()))))
+                .collect(Collectors.toList())));
     }
 
     public FetchResponse(FetchResponseData fetchResponseData) {
         super(ApiKeys.FETCH);
         this.data = fetchResponseData;
-        this.responseDataMap = toResponseDataMap(fetchResponseData);
+        this.responseData = new LinkedHashMap<>();

Review comment:
       This isn't needed when we return a fetch from the broker, right? If this is true, can we remove it from the fetch response and built it on the client when needed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org