You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2023/01/06 15:33:05 UTC

[GitHub] [kafka] fvaleri opened a new pull request, #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

fvaleri opened a new pull request, #13085:
URL: https://github.com/apache/kafka/pull/13085

   Part of KAFKA-14470: Move log layer to storage module.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1063544116


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchIsolation.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+public interface FetchIsolation {

Review Comment:
   This should be an enum.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1064119776


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchParams.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.replica.ClientMetadata;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class FetchParams {
+    private final short requestVersion;
+    private final int replicaId;
+    private final long maxWaitMs;
+    private final int minBytes;
+    private final int maxBytes;
+    private final FetchIsolation isolation;
+    private Optional<ClientMetadata> clientMetadata;
+
+    public FetchParams(short requestVersion,
+                       int replicaId,
+                       long maxWaitMs,
+                       int minBytes,
+                       int maxBytes,
+                       FetchIsolation isolation,
+                       Optional<ClientMetadata> clientMetadata) {
+        this.requestVersion = requestVersion;
+        this.replicaId = replicaId;
+        this.maxWaitMs = maxWaitMs;
+        this.minBytes = minBytes;
+        this.maxBytes = maxBytes;
+        this.isolation = isolation;
+        this.clientMetadata = clientMetadata;
+    }
+
+    public boolean isFromFollower() {
+        return FetchRequestUtils.isValidBrokerId(replicaId);
+    }
+
+    public boolean isFromConsumer() {
+        return FetchRequestUtils.isConsumer(replicaId);
+    }
+
+    public boolean fetchOnlyLeader() {
+        return isFromFollower() || (isFromConsumer() && !clientMetadata.isPresent());
+    }
+
+    public boolean hardMaxBytesLimit() {
+        return requestVersion <= 2;
+    }
+
+    public short requestVersion() {
+        return requestVersion;
+    }
+
+    public int replicaId() {
+        return replicaId;
+    }
+
+    public long maxWaitMs() {
+        return maxWaitMs;
+    }
+
+    public int minBytes() {
+        return minBytes;
+    }
+
+    public int maxBytes() {
+        return maxBytes;
+    }
+
+    public FetchIsolation isolation() {
+        return isolation;
+    }
+
+    public Optional<ClientMetadata> clientMetadata() {
+        return clientMetadata;
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   Yes, they are needed for the following tests:
   
   - ReplicaAlterLogDirsThreadTest.shouldReplaceCurrentLogDirWhenCaughtUp
   - ReplicaAlterLogDirsThreadTest.shouldUpdateLeaderEpochAfterFencedEpochError
   
   They both call `mockFetchFromCurrentLog` that uses the equal matcher on a instance of `FetchParams`. I would prefer to not touch tests if possible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1063631875


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchIsolation.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+public interface FetchIsolation {

Review Comment:
   Ok, thanks. Should be fine now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1064122413


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1189,7 +1189,7 @@ class Partition(val topicPartition: TopicPartition,
    * @param minOneMessage whether to ensure that at least one complete message is returned
    * @param updateFetchState true if the Fetch should update replica state (only applies to follower fetches)
    * @return [[LogReadInfo]] containing the fetched records or the diverging epoch if present
-   * @throws NotLeaderOrFollowerException if this node is not the current leader and [[FetchParams.fetchOnlyLeader]]
+   * @throws NotLeaderOrFollowerException if this node is not the current leader and `FetchParams.fetchOnlyLeader`

Review Comment:
   TBH, I haven't found a way to link a Java method from scaladoc. Do you have any suggestion here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1064117991


##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2312,26 +2313,26 @@ class ReplicaManagerTest {
   }
 
   private def fetchPartitions(
-    replicaManager: ReplicaManager,
-    replicaId: Int,
-    fetchInfos: Seq[(TopicIdPartition, PartitionData)],
-    responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
-    requestVersion: Short = ApiKeys.FETCH.latestVersion,
-    maxWaitMs: Long = 0,
-    minBytes: Int = 1,
-    maxBytes: Int = 1024 * 1024,
-    quota: ReplicaQuota = UnboundedQuota,
-    isolation: FetchIsolation = FetchLogEnd,
-    clientMetadata: Option[ClientMetadata] = None
+                               replicaManager: ReplicaManager,
+                               replicaId: Int,
+                               fetchInfos: Seq[(TopicIdPartition, PartitionData)],
+                               responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
+                               requestVersion: Short = ApiKeys.FETCH.latestVersion,
+                               maxWaitMs: Long = 0,
+                               minBytes: Int = 1,
+                               maxBytes: Int = 1024 * 1024,
+                               quota: ReplicaQuota = UnboundedQuota,
+                               isolation: FetchIsolation = FetchIsolation.LOG_END,
+                               clientMetadata: Option[ClientMetadata] = None

Review Comment:
   Reverting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1068799077


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchDataInfo.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Records;
+
+import java.util.List;
+import java.util.Optional;
+
+public class FetchDataInfo {
+    private final LogOffsetMetadata fetchOffsetMetadata;
+    private final Records records;
+    private final boolean firstEntryIncomplete;
+    private final Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions;
+
+    public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+                         Records records) {
+        this(fetchOffsetMetadata, records, false, Optional.empty());
+    }
+
+    public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+                         Records records,
+                         boolean firstEntryIncomplete) {
+        this(fetchOffsetMetadata, records, firstEntryIncomplete, Optional.empty());
+    }
+
+    public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+                         Records records,
+                         Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions) {
+        this(fetchOffsetMetadata, records, false, abortedTransactions);
+    }
+
+    public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+                         Records records,
+                         boolean firstEntryIncomplete,
+                         Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions) {

Review Comment:
   I reduced them a bit to make it more manageable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1064118091


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchParams.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.replica.ClientMetadata;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class FetchParams {
+    private final short requestVersion;
+    private final int replicaId;
+    private final long maxWaitMs;
+    private final int minBytes;
+    private final int maxBytes;
+    private final FetchIsolation isolation;
+    private Optional<ClientMetadata> clientMetadata;

Review Comment:
   Fixing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1064152167


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -123,12 +123,12 @@ case class LogReadResult(info: FetchDataInfo,
     this.info.records,
     this.divergingEpoch,
     this.lastStableOffset,
-    this.info.abortedTransactions,
+    if (this.info.abortedTransactions.isPresent) Some(this.info.abortedTransactions.get().asScala.toList) else None,

Review Comment:
   Agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1063613984


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchIsolation.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+public interface FetchIsolation {

Review Comment:
   And it's better if it's compiled to a class generally speaking.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1068799278


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchParams.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.replica.ClientMetadata;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class FetchParams {
+    private final short requestVersion;
+    private final int replicaId;
+    private final long maxWaitMs;
+    private final int minBytes;
+    private final int maxBytes;
+    private final FetchIsolation isolation;
+    private Optional<ClientMetadata> clientMetadata;
+
+    public FetchParams(short requestVersion,
+                       int replicaId,
+                       long maxWaitMs,
+                       int minBytes,
+                       int maxBytes,
+                       FetchIsolation isolation,
+                       Optional<ClientMetadata> clientMetadata) {
+        this.requestVersion = requestVersion;
+        this.replicaId = replicaId;
+        this.maxWaitMs = maxWaitMs;
+        this.minBytes = minBytes;
+        this.maxBytes = maxBytes;
+        this.isolation = isolation;
+        this.clientMetadata = clientMetadata;
+    }
+
+    public boolean isFromFollower() {
+        return FetchRequestUtils.isValidBrokerId(replicaId);
+    }
+
+    public boolean isFromConsumer() {
+        return FetchRequestUtils.isConsumer(replicaId);
+    }
+
+    public boolean fetchOnlyLeader() {
+        return isFromFollower() || (isFromConsumer() && !clientMetadata.isPresent());
+    }
+
+    public boolean hardMaxBytesLimit() {
+        return requestVersion <= 2;
+    }
+
+    public short requestVersion() {
+        return requestVersion;
+    }
+
+    public int replicaId() {
+        return replicaId;
+    }
+
+    public long maxWaitMs() {
+        return maxWaitMs;
+    }
+
+    public int minBytes() {
+        return minBytes;
+    }
+
+    public int maxBytes() {
+        return maxBytes;
+    }
+
+    public FetchIsolation isolation() {
+        return isolation;
+    }
+
+    public Optional<ClientMetadata> clientMetadata() {
+        return clientMetadata;
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   Fair enough, I adjusted the implementations a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1064152167


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -123,12 +123,12 @@ case class LogReadResult(info: FetchDataInfo,
     this.info.records,
     this.divergingEpoch,
     this.lastStableOffset,
-    this.info.abortedTransactions,
+    if (this.info.abortedTransactions.isPresent) Some(this.info.abortedTransactions.get().asScala.toList) else None,

Review Comment:
   Agree. I also moved `FetchPartitionData` to get rid of this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1063613984


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchIsolation.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+public interface FetchIsolation {

Review Comment:
   And it's better if it's compiled to a class generally speaking (lower number of potential performance pitfalls).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1068798767


##########
core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala:
##########
@@ -276,25 +276,22 @@ class ReplicaAlterLogDirsThreadTest {
     val callbackCaptor: ArgumentCaptor[Seq[(TopicIdPartition, FetchPartitionData)] => Unit] =
       ArgumentCaptor.forClass(classOf[Seq[(TopicIdPartition, FetchPartitionData)] => Unit])
 
-    val expectedFetchParams = FetchParams(
-      requestVersion = ApiKeys.FETCH.latestVersion,
-      replicaId = Request.FutureLocalReplicaId,
-      maxWaitMs = 0L,
-      minBytes = 0,
-      maxBytes = config.replicaFetchResponseMaxBytes,
-      isolation = FetchLogEnd,
-      clientMetadata = None
+    val expectedFetchParams = new FetchParams(
+      ApiKeys.FETCH.latestVersion,
+      FetchRequest.FUTURE_LOCAL_REPLICA_ID,
+      0L,
+      0,
+      config.replicaFetchResponseMaxBytes,
+      FetchIsolation.LOG_END,
+      Optional.empty()
     )
 
-    println(expectedFetchParams)
-
     when(replicaManager.fetchMessages(
       params = ArgumentMatchers.eq(expectedFetchParams),
       fetchInfos = ArgumentMatchers.eq(Seq(topicIdPartition -> requestData)),
       quota = ArgumentMatchers.eq(UnboundedQuota),
       responseCallback = callbackCaptor.capture(),
     )).thenAnswer(_ => {
-      println("Did we get the callback?")

Review Comment:
   Deleted two stray `println` here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1063594153


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchIsolation.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+public interface FetchIsolation {

Review Comment:
   I thought about that, but then I used `javap` to decompile the class and this is how it comes out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1063609877


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchIsolation.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+public interface FetchIsolation {

Review Comment:
   The relevance is that an enum is compiled into a class, while the sealed trait into an interface. Anyway, I'll try to change into an enum.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1064119776


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchParams.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.replica.ClientMetadata;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class FetchParams {
+    private final short requestVersion;
+    private final int replicaId;
+    private final long maxWaitMs;
+    private final int minBytes;
+    private final int maxBytes;
+    private final FetchIsolation isolation;
+    private Optional<ClientMetadata> clientMetadata;
+
+    public FetchParams(short requestVersion,
+                       int replicaId,
+                       long maxWaitMs,
+                       int minBytes,
+                       int maxBytes,
+                       FetchIsolation isolation,
+                       Optional<ClientMetadata> clientMetadata) {
+        this.requestVersion = requestVersion;
+        this.replicaId = replicaId;
+        this.maxWaitMs = maxWaitMs;
+        this.minBytes = minBytes;
+        this.maxBytes = maxBytes;
+        this.isolation = isolation;
+        this.clientMetadata = clientMetadata;
+    }
+
+    public boolean isFromFollower() {
+        return FetchRequestUtils.isValidBrokerId(replicaId);
+    }
+
+    public boolean isFromConsumer() {
+        return FetchRequestUtils.isConsumer(replicaId);
+    }
+
+    public boolean fetchOnlyLeader() {
+        return isFromFollower() || (isFromConsumer() && !clientMetadata.isPresent());
+    }
+
+    public boolean hardMaxBytesLimit() {
+        return requestVersion <= 2;
+    }
+
+    public short requestVersion() {
+        return requestVersion;
+    }
+
+    public int replicaId() {
+        return replicaId;
+    }
+
+    public long maxWaitMs() {
+        return maxWaitMs;
+    }
+
+    public int minBytes() {
+        return minBytes;
+    }
+
+    public int maxBytes() {
+        return maxBytes;
+    }
+
+    public FetchIsolation isolation() {
+        return isolation;
+    }
+
+    public Optional<ClientMetadata> clientMetadata() {
+        return clientMetadata;
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   Yes, they are needed for the following tests:
   
   - ReplicaAlterLogDirsThreadTest.shouldReplaceCurrentLogDirWhenCaughtUp
   - ReplicaAlterLogDirsThreadTest.shouldUpdateLeaderEpochAfterFencedEpochError
   
   They both call `mockFetchFromCurrentLog` that uses the equal matcher on a instance of `FetchParams`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on PR #13085:
URL: https://github.com/apache/kafka/pull/13085#issuecomment-1373794678

   @ijuma @satishd fyi


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on PR #13085:
URL: https://github.com/apache/kafka/pull/13085#issuecomment-1374849730

   > Note that the fetch path is performance sensitive, so we have to be careful with collection conversions and other such things.
   
   Yes and thanks for the detailed reviews. Much appreciated.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on PR #13085:
URL: https://github.com/apache/kafka/pull/13085#issuecomment-1381478406

   > I pushed a few clean-ups and it LGTM now. @fvaleri Are you good with them too?
   
   Yes, good. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma merged pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma merged PR #13085:
URL: https://github.com/apache/kafka/pull/13085


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1063598351


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchIsolation.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+public interface FetchIsolation {

Review Comment:
   A Java enum is the right way to represent `FetchIsolation`. In Scala, it's often emulated with a sealed trait and case objects. I don't understand the `javap` relevance here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on PR #13085:
URL: https://github.com/apache/kafka/pull/13085#issuecomment-1381334743

   JDK 8 and 11 builds passed, 17 had 2 unrelated failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on PR #13085:
URL: https://github.com/apache/kafka/pull/13085#issuecomment-1374423774

   Let's wait for CI builds, but it should be good now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1064083858


##########
clients/src/main/java/org/apache/kafka/common/utils/FetchRequestUtils.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+public class FetchRequestUtils {

Review Comment:
   I would just add these methods to the existing `FetchRequest` class.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2312,26 +2313,26 @@ class ReplicaManagerTest {
   }
 
   private def fetchPartitions(
-    replicaManager: ReplicaManager,
-    replicaId: Int,
-    fetchInfos: Seq[(TopicIdPartition, PartitionData)],
-    responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
-    requestVersion: Short = ApiKeys.FETCH.latestVersion,
-    maxWaitMs: Long = 0,
-    minBytes: Int = 1,
-    maxBytes: Int = 1024 * 1024,
-    quota: ReplicaQuota = UnboundedQuota,
-    isolation: FetchIsolation = FetchLogEnd,
-    clientMetadata: Option[ClientMetadata] = None
+                               replicaManager: ReplicaManager,
+                               replicaId: Int,
+                               fetchInfos: Seq[(TopicIdPartition, PartitionData)],
+                               responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
+                               requestVersion: Short = ApiKeys.FETCH.latestVersion,
+                               maxWaitMs: Long = 0,
+                               minBytes: Int = 1,
+                               maxBytes: Int = 1024 * 1024,
+                               quota: ReplicaQuota = UnboundedQuota,
+                               isolation: FetchIsolation = FetchIsolation.LOG_END,
+                               clientMetadata: Option[ClientMetadata] = None

Review Comment:
   Why this indent change?



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -123,12 +123,12 @@ case class LogReadResult(info: FetchDataInfo,
     this.info.records,
     this.divergingEpoch,
     this.lastStableOffset,
-    this.info.abortedTransactions,
+    if (this.info.abortedTransactions.isPresent) Some(this.info.abortedTransactions.get().asScala.toList) else None,

Review Comment:
   The last `toList` results in a copy of the collection, we'd want to avoid that.



##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchDataInfo.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Records;
+
+import java.util.List;
+import java.util.Optional;
+
+public class FetchDataInfo {
+    private final LogOffsetMetadata fetchOffsetMetadata;
+    private final Records records;
+    private final boolean firstEntryIncomplete;
+    private final Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions;
+
+    public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+                         Records records) {
+        this(fetchOffsetMetadata, records, false, Optional.empty());
+    }
+
+    public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+                         Records records,
+                         boolean firstEntryIncomplete) {
+        this(fetchOffsetMetadata, records, firstEntryIncomplete, Optional.empty());
+    }
+
+    public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+                         Records records,
+                         Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions) {
+        this(fetchOffsetMetadata, records, false, abortedTransactions);
+    }
+
+    public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+                         Records records,
+                         boolean firstEntryIncomplete,
+                         Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions) {

Review Comment:
   We have a lot of constructors, do we need these all?



##########
core/src/main/scala/kafka/log/LocalLog.scala:
##########
@@ -1005,12 +1005,12 @@ object LocalLog extends Logging {
 
   private[log] def emptyFetchDataInfo(fetchOffsetMetadata: LogOffsetMetadata,
                                       includeAbortedTxns: Boolean): FetchDataInfo = {
-    val abortedTransactions =
-      if (includeAbortedTxns) Some(List.empty[FetchResponseData.AbortedTransaction])
-      else None
-    FetchDataInfo(fetchOffsetMetadata,
+    val abortedTransactions: Optional[java.util.List[FetchResponseData.AbortedTransaction]] =
+      if (includeAbortedTxns) Optional.of(List.empty[FetchResponseData.AbortedTransaction].asJava)

Review Comment:
   We can pass `Collections.emptyList` instead of creating a Scala list and then converting to Java.



##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchParams.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.replica.ClientMetadata;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class FetchParams {
+    private final short requestVersion;
+    private final int replicaId;
+    private final long maxWaitMs;
+    private final int minBytes;
+    private final int maxBytes;
+    private final FetchIsolation isolation;
+    private Optional<ClientMetadata> clientMetadata;

Review Comment:
   How come this isn't final?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1189,7 +1189,7 @@ class Partition(val topicPartition: TopicPartition,
    * @param minOneMessage whether to ensure that at least one complete message is returned
    * @param updateFetchState true if the Fetch should update replica state (only applies to follower fetches)
    * @return [[LogReadInfo]] containing the fetched records or the diverging epoch if present
-   * @throws NotLeaderOrFollowerException if this node is not the current leader and [[FetchParams.fetchOnlyLeader]]
+   * @throws NotLeaderOrFollowerException if this node is not the current leader and `FetchParams.fetchOnlyLeader`

Review Comment:
   Are these changes related to this PR?



##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchIsolation.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+public enum FetchIsolation {
+    LOG_END,
+    HIGH_WATERMARK,
+    TXN_COMMITTED;
+
+    public static FetchIsolation apply(FetchRequest request) {

Review Comment:
   I'd call this method and the other methods `of`. That's more common for Java.



##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchParams.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.replica.ClientMetadata;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class FetchParams {
+    private final short requestVersion;
+    private final int replicaId;
+    private final long maxWaitMs;
+    private final int minBytes;
+    private final int maxBytes;
+    private final FetchIsolation isolation;

Review Comment:
   We can make these fields public since they're immutable. The accessors don't add much value in such cases.



##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchParams.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.replica.ClientMetadata;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+import java.util.Objects;
+import java.util.Optional;
+
+public class FetchParams {
+    private final short requestVersion;
+    private final int replicaId;
+    private final long maxWaitMs;
+    private final int minBytes;
+    private final int maxBytes;
+    private final FetchIsolation isolation;
+    private Optional<ClientMetadata> clientMetadata;
+
+    public FetchParams(short requestVersion,
+                       int replicaId,
+                       long maxWaitMs,
+                       int minBytes,
+                       int maxBytes,
+                       FetchIsolation isolation,
+                       Optional<ClientMetadata> clientMetadata) {
+        this.requestVersion = requestVersion;
+        this.replicaId = replicaId;
+        this.maxWaitMs = maxWaitMs;
+        this.minBytes = minBytes;
+        this.maxBytes = maxBytes;
+        this.isolation = isolation;
+        this.clientMetadata = clientMetadata;
+    }
+
+    public boolean isFromFollower() {
+        return FetchRequestUtils.isValidBrokerId(replicaId);
+    }
+
+    public boolean isFromConsumer() {
+        return FetchRequestUtils.isConsumer(replicaId);
+    }
+
+    public boolean fetchOnlyLeader() {
+        return isFromFollower() || (isFromConsumer() && !clientMetadata.isPresent());
+    }
+
+    public boolean hardMaxBytesLimit() {
+        return requestVersion <= 2;
+    }
+
+    public short requestVersion() {
+        return requestVersion;
+    }
+
+    public int replicaId() {
+        return replicaId;
+    }
+
+    public long maxWaitMs() {
+        return maxWaitMs;
+    }
+
+    public int minBytes() {
+        return minBytes;
+    }
+
+    public int maxBytes() {
+        return maxBytes;
+    }
+
+    public FetchIsolation isolation() {
+        return isolation;
+    }
+
+    public Optional<ClientMetadata> clientMetadata() {
+        return clientMetadata;
+    }
+
+    @Override
+    public boolean equals(Object o) {

Review Comment:
   Do we actually need to implement `equals` and `hashCode` for this? Is it used as the key of a map or something like that? I suspect it may not be needed and it was only there because it comes for free with case classes. Same question for other classes where we have implemented these methods.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
fvaleri commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1064116819


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchDataInfo.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Records;
+
+import java.util.List;
+import java.util.Optional;
+
+public class FetchDataInfo {
+    private final LogOffsetMetadata fetchOffsetMetadata;
+    private final Records records;
+    private final boolean firstEntryIncomplete;
+    private final Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions;
+
+    public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+                         Records records) {
+        this(fetchOffsetMetadata, records, false, Optional.empty());
+    }
+
+    public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+                         Records records,
+                         boolean firstEntryIncomplete) {
+        this(fetchOffsetMetadata, records, firstEntryIncomplete, Optional.empty());
+    }
+
+    public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+                         Records records,
+                         Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions) {
+        this(fetchOffsetMetadata, records, false, abortedTransactions);
+    }
+
+    public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+                         Records records,
+                         boolean firstEntryIncomplete,
+                         Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions) {

Review Comment:
   I added them based on existing code usage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1063688608


##########
storage/src/main/java/org/apache/kafka/server/log/internals/FetchIsolation.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.internals;
+
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.utils.FetchRequestUtils;
+
+public enum FetchIsolation {
+    FETCH_LOG_END,
+    FETCH_HIGH_WATERMARK,
+    FETCH_TXN_COMMITTED;

Review Comment:
   We can probably remove the `FETCH` prefix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] ijuma commented on a diff in pull request #13085: KAFKA-14568: Move FetchDataInfo and related to storage module

Posted by GitBox <gi...@apache.org>.
ijuma commented on code in PR #13085:
URL: https://github.com/apache/kafka/pull/13085#discussion_r1063687740


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1249,12 +1249,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
            isolation: FetchIsolation,
            minOneMessage: Boolean): FetchDataInfo = {
     checkLogStartOffset(startOffset)
-    val maxOffsetMetadata = isolation match {
-      case FetchLogEnd => localLog.logEndOffsetMetadata
-      case FetchHighWatermark => fetchHighWatermarkMetadata
-      case FetchTxnCommitted => fetchLastStableOffsetMetadata
-    }
-    localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchTxnCommitted)
+    var maxOffsetMetadata = localLog.logEndOffsetMetadata
+    if (isolation == FetchIsolation.FETCH_HIGH_WATERMARK) maxOffsetMetadata = fetchHighWatermarkMetadata
+    else if (isolation == FetchIsolation.FETCH_TXN_COMMITTED) maxOffsetMetadata = fetchLastStableOffsetMetadata

Review Comment:
   I think you can keep the `match` here as it was.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org