You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/07 16:58:34 UTC

[GitHub] [kafka] divijvaidya commented on a diff in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.

divijvaidya commented on code in PR #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r1042140948


##########
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##########
@@ -192,4 +204,140 @@ class ReplicaFetcherThread(name: String,
     partition.truncateFullyAndStartAt(offset, isFuture = false)
   }
 
+  def buildProducerSnapshotFile(snapshotFile: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata, rlm: RemoteLogManager): Unit = {
+    val tmpSnapshotFile = new File(snapshotFile.getAbsolutePath + ".tmp")
+    // Copy it to snapshot file in atomic manner.
+    Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+      tmpSnapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+    Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath, snapshotFile.toPath, false)
+  }
+
+  /**
+   * It tries to build the required state for this partition from leader and remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: Long,
+                                                epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): Long = {
+
+    def fetchEarlierEpochEndOffset(epoch: Int): EpochEndOffset = {
+      val previousEpoch = epoch - 1
+      // Find the end-offset for the epoch earlier to the given epoch from the leader
+      val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition())
+        .setCurrentLeaderEpoch(currentLeaderEpoch)
+        .setLeaderEpoch(previousEpoch))
+      val maybeEpochEndOffset = leader.fetchEpochEndOffsets(partitionsWithEpochs).get(partition)
+      if (maybeEpochEndOffset.isEmpty) {
+        throw new KafkaException("No response received for partition: " + partition);
+      }
+
+      val epochEndOffset = maybeEpochEndOffset.get
+      if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+        throw Errors.forCode(epochEndOffset.errorCode()).exception()
+      }
+
+      epochEndOffset
+    }
+
+    val log = replicaMgr.localLogOrException(partition)
+    val nextOffset = {
+      if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) {

Review Comment:
   Consider the following scenario:
   1. Leader is archiving to tiered storage and has a follower.
   2. Follower has caught up to offset X (exclusive).
   3. While follower is offline, leader moves X to tiered storage and expires data locally till Y, such that, leaderLocalLogStartOffset > X and Y = leaderLocalLogStartOffset. Meanwhile, X has been expired from tiered storage as well. Hence, X < globalLogStartOffset as well. Now, there could be a scenario where globalLogStartOffset > leaderLocalLogStartOffset because segments has been expired from remote but not from local.
   3. Follower comes online and tries to fetch X from leader, leader throws moved to tiered storage exception.
   4. Follower moves to buildAux state and tries to fetch the metadata. The metadata may not exist since the segment has been deleted in remote storage and we will get an error.
   
   This could be addressed at replica manager where it could detect if the remote segments have been deleted and accordingly throw an out of bound instead of move to tiered storage exception, but we should also add a defensive handling check here. In the above scenario, we should directly move to truncation instead of build aux state.
   
   The defensive check could be `&& leaderLocalLogStartOffset > leaderLogStartOffset` over here.
   
   Also, please add a test for this scenario.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org