You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/11/27 05:35:22 UTC

[GitHub] [kafka] satishd commented on a diff in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.

satishd commented on code in PR #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r1032869910


##########
core/src/main/scala/kafka/server/ReplicaFetcherThread.scala:
##########
@@ -386,11 +397,147 @@ class ReplicaFetcherThread(name: String,
   }
 
   /**
-   *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
-   *  the quota is exceeded and the replica is not in sync.
+   * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
+   * the quota is exceeded and the replica is not in sync.
    */
   private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
     !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded
   }
 
+  /**
+   * It tries to build the required state for this partition from leader and remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: Long,
+                                                epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): Long = {
+
+    def fetchEarlierEpochEndOffset(epoch: Int): EpochEndOffset = {
+      val previousEpoch = epoch - 1
+      // Find the end-offset for the epoch earlier to the given epoch from the leader
+      val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition())
+        .setCurrentLeaderEpoch(currentLeaderEpoch)
+        .setLeaderEpoch(previousEpoch))
+      val maybeEpochEndOffset = fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition)
+      if (maybeEpochEndOffset.isEmpty) {
+        throw new KafkaException("No response received for partition: " + partition);
+      }
+
+      val epochEndOffset = maybeEpochEndOffset.get
+      if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+        throw Errors.forCode(epochEndOffset.errorCode()).exception()
+      }
+
+      epochEndOffset
+    }
+
+    val log = replicaMgr.localLogOrException(partition)
+    val nextOffset = {
+      if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) {
+        if (replicaMgr.remoteLogManager.isEmpty) throw new IllegalStateException("RemoteLogManager is not yet instantiated")
+
+        val rlm = replicaMgr.remoteLogManager.get
+
+        // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+        // until that offset
+        val previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1
+        val targetEpoch: Int = {
+          // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+          // will have the same epoch.
+          if (epochForLeaderLocalLogStartOffset == 0) {
+            epochForLeaderLocalLogStartOffset
+          } else {
+            // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+            val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+            // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive.
+            if (earlierEpochEndOffset.endOffset > previousOffsetToLeaderLocalLogStartOffset) {
+              // Always use the leader epoch from returned earlierEpochEndOffset.
+              // This gives the respective leader epoch, that will handle any gaps in epochs.
+              // For ex, leader epoch cache contains:
+              // leader-epoch   start-offset
+              //  0 		          20
+              //  1 		          85
+              //  <2> - gap no messages were appended in this leader epoch.
+              //  3 		          90
+              //  4 		          98
+              // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
+              // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
+              // So, for offset 89, we should return leader epoch as 1 like below.
+              earlierEpochEndOffset.leaderEpoch()
+            } else epochForLeaderLocalLogStartOffset
+          }
+        }
+
+        val maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset)
+
+        if (maybeRlsm.isPresent) {
+          val remoteLogSegmentMetadata = maybeRlsm.get()
+          // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start
+          // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+          val nextOffset = remoteLogSegmentMetadata.endOffset() + 1
+          val epochStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH)
+          val epochs = readLeaderEpochCheckpoint(epochStream)
+
+          // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
+          truncateFullyAndStartAt(partition, nextOffset)
+
+          log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented)
+          log.leaderEpochCache.foreach { cache =>
+            epochs.foreach(epochEntry =>
+              // Do not flush to file for each entry.
+              cache.assign(epochEntry.epoch, epochEntry.startOffset, flushToFile = false)
+            )
+            // Flush the cache to the file.
+            cache.flush()
+          }
+
+          debug(s"Updated the epoch cache from remote tier till offset: $leaderLocalLogStartOffset " +
+            s"with size: ${epochs.size} for $partition")
+
+          // Restore producer snapshot
+          val snapshotFile = UnifiedLog.producerSnapshotFile(log.dir, nextOffset)
+          val tmpSnapshotFile = new File(snapshotFile.getAbsolutePath + ".tmp");
+          // Copy it to snapshot file in atomic manner.
+          Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+            tmpSnapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+          Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath, snapshotFile.toPath, false)
+
+          // Reload producer snapshots.
+          log.producerStateManager.reloadSnapshots()
+          log.loadProducerState(nextOffset, reloadFromCleanShutdown = false)
+          debug(s"Built the leader epoch cache and producer snapshots from remote tier for $partition. " +
+            s"Active producers: ${log.producerStateManager.activeProducers.size}, " +
+            s"LeaderLogStartOffset: $leaderLogStartOffset, endOffset: $nextOffset")
+
+          // Return the offset from which next fetch should happen.
+          nextOffset
+        } else {
+          throw new RemoteStorageException(s"Couldn't build the state from remote store for partition: $partition, " +
+            s"currentLeaderEpoch: $currentLeaderEpoch, leaderLocalLogStartOffset: $leaderLocalLogStartOffset, " +
+            s"leaderLogStartOffset: $leaderLogStartOffset, epoch: $targetEpoch as the previous remote log segment " +
+            s"metadata was not found")
+        }
+
+      } else {

Review Comment:
   @junrao We were doing that earlier but you suggested to do normal truncation until local-log-start-offset as mentioned [here](https://github.com/apache/kafka/pull/11390#discussion_r772110978). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org