You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "dajac (via GitHub)" <gi...@apache.org> on 2023/05/24 08:13:19 UTC

[GitHub] [kafka] dajac commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

dajac commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1203664360


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1121,21 +1154,69 @@ class ReplicaManager(val config: KafkaConfig,
     partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets, isFromConsumer, fetchOnlyFromLeader)
   }
 
+  /**
+   * Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo could not be scheduled successfully
+   * else returns [[None]].
+   */
+  private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo,
+                                 params: FetchParams,
+                                 responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
+                                 logReadResults: Seq[(TopicIdPartition, LogReadResult)],
+                                 fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Option[LogReadResult] = {
+    val key = new TopicPartitionOperationKey(remoteFetchInfo.topicPartition.topic(), remoteFetchInfo.topicPartition.partition())
+    val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]
+    var remoteFetchTask: Future[Void] = null
+    try {
+      remoteFetchTask = remoteLogManager.get.asyncRead(remoteFetchInfo, (result: RemoteLogReadResult) => {
+        remoteFetchResult.complete(result)
+        delayedRemoteFetchPurgatory.checkAndComplete(key)
+      })
+    } catch {
+      case e: RejectedExecutionException =>
+        // Return the error if any in scheduling the remote fetch task
+        return Some(createLogReadResult(e))
+    }
+
+    val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo,
+      fetchPartitionStatus, params, logReadResults, this, responseCallback)
+
+    delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
+    None
+  }
+
+  private def buildPartitionToFetchPartitionData(logReadResults: Seq[(TopicIdPartition, LogReadResult)],
+                                                 remoteFetchTopicPartition: TopicPartition,
+                                                 error: LogReadResult): Seq[(TopicIdPartition, FetchPartitionData)] = {
+    logReadResults.map { case (tp, result) =>
+      val fetchPartitionData = {
+        if (tp.topicPartition().equals(remoteFetchTopicPartition))
+          error
+        else
+          result
+      }.toFetchPartitionData(false)
+
+      tp -> fetchPartitionData
+    }
+  }
+
   /**
    * Fetch messages from a replica, and wait until enough data can be fetched and return;
    * the callback function will be triggered either when timeout or required fetch info is satisfied.
    * Consumers may fetch from any replica, but followers can only fetch from the leader.
    */
-  def fetchMessages(
-    params: FetchParams,
-    fetchInfos: Seq[(TopicIdPartition, PartitionData)],
-    quota: ReplicaQuota,
-    responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
-  ): Unit = {
+  def fetchMessages(params: FetchParams,

Review Comment:
   A small comment. We should avoid completely changing the code style without reasons. The format of the method was not a mistake. It is the format that we mainly used in this class nowadays.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org