You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Hangleton (via GitHub)" <gi...@apache.org> on 2023/04/14 14:23:09 UTC

[GitHub] [kafka] Hangleton commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

Hangleton commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1166846747


##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
         }
     }
 
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadata.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not exist in remote tier.");
+        }
+
+        int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is greater than or equal to the target offset
+            remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+            RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty());
+
+            int updatedFetchSize =

Review Comment:
   Is there a risk here to overflow the `buffer` if it is allocated up to `maxBytes` and the first batch is greater than that?
   
   This could happen if the partition read here is not the first one from the Fetch request in which case `minOneMessage` is false [*]. In the local counterpart, an empty set of records is returned ([source](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1247-L1250), where `firstEntryIncomplete` describes the case where (using the equivalent notation used here) `minOneMessage` was false and `firstBatch.sizeInBytes() > maxBytes`).
   
   [*] Even if only one remote partition is served per Fetch request, there can still be another partition from that request served from its local replica log hence a possibility for not at least one message requested to be read here.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
         }
     }
 
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadata.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not exist in remote tier.");
+        }
+
+        int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is greater than or equal to the target offset
+            remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+            RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty());
+
+            int updatedFetchSize =
+                    remoteStorageFetchInfo.minOneMessage && firstBatch.sizeInBytes() > maxBytes
+                            ? firstBatch.sizeInBytes() : maxBytes;
+
+            ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+            int remainingBytes = updatedFetchSize;
+
+            firstBatch.writeTo(buffer);
+            remainingBytes -= firstBatch.sizeInBytes();
+
+            if (remainingBytes > 0) {
+                // input stream is read till (startPos - 1) while getting the batch of records earlier.
+                // read the input stream until min of (EOF stream or buffer's remaining capacity).
+                Utils.readFully(remoteSegInputStream, buffer);
+            }
+            buffer.flip();
+
+            FetchDataInfo fetchDataInfo = new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer));
+            if (includeAbortedTxns) {
+                fetchDataInfo = addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(), fetchDataInfo);
+            }
+
+            return fetchDataInfo;
+        } finally {
+            Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream");
+        }
+    }
+
+    private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
+        return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+    }
+
+    private FetchDataInfo addAbortedTransactions(long startOffset,
+                                         RemoteLogSegmentMetadata segmentMetadata,
+                                         FetchDataInfo fetchInfo) throws RemoteStorageException {
+        int fetchSize = fetchInfo.records.sizeInBytes();
+        OffsetPosition startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+                fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+        OffsetIndex offsetIndex = indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+        long upperBoundOffset = offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+                .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+        final List<FetchResponseData.AbortedTransaction> abortedTransactions = new ArrayList<>();
+
+        Consumer<List<AbortedTxn>> accumulator =
+                abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+                        .map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+
+        collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator);
+
+        return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+                fetchInfo.records,
+                fetchInfo.firstEntryIncomplete,
+                Optional.of(abortedTransactions));
+    }
+
+    private void collectAbortedTransactions(long startOffset,
+                                    long upperBoundOffset,
+                                    RemoteLogSegmentMetadata segmentMetadata,
+                                    Consumer<List<AbortedTxn>> accumulator) throws RemoteStorageException {
+        TopicPartition topicPartition = segmentMetadata.topicIdPartition().topicPartition();
+        Iterator<LogSegment> localLogSegments = fetchLog.apply(topicPartition)
+                .map(log -> JavaConverters.asJavaCollection(log.logSegments()))
+                .map(Collection::iterator)
+                .orElse(Collections.emptyIterator());
+
+        boolean searchInLocalLog = false;
+        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = Optional.of(segmentMetadata);
+        Optional<TransactionIndex> txnIndexOpt = nextSegmentMetadataOpt.map(metadata -> indexCache.getIndexEntry(metadata).txnIndex());
+
+        while (txnIndexOpt.isPresent()) {
+            TxnIndexSearchResult searchResult = txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+            accumulator.accept(searchResult.abortedTransactions);
+            if (!searchResult.isComplete) {
+                if (!searchInLocalLog) {
+                    nextSegmentMetadataOpt = findNextSegmentMetadata(nextSegmentMetadataOpt.get());
+
+                    txnIndexOpt = nextSegmentMetadataOpt.map(x -> indexCache.getIndexEntry(x).txnIndex());

Review Comment:
   There may be an availability risk here. The remote index cache allows up to 1,024 entries by default and it is possible to keep a transaction index entry permanently rotated in and out of the cache so that retries of Fetch requests for a partition at a given (constant) offset permanently trigger a download of the index. This, plus the fact there is a global lock for read and write access on the remote index cache, can lead to lock contention on the remote log reader thread pool. In the worst case, it is possible for a partition to be prevented from making progress on the fetch path.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1118,9 +1122,13 @@ class ReplicaManager(val config: KafkaConfig,
     responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
   ): Unit = {
     // check if this fetch request can be satisfied right away
-    val logReadResults = readFromLocalLog(params, fetchInfos, quota, readFromPurgatory = false)
+    val logReadResults = readFromLog(params, fetchInfos, quota, readFromPurgatory = false)
     var bytesReadable: Long = 0
     var errorReadingData = false
+
+    // The 1st topic-partition that has to be read from remote storage
+    var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()

Review Comment:
   Agreed - there are consumption patterns which diverge from the local case with this approach (that is, uneven progress across the partitions consumed from a topic [with said partitions of the same nature w.r.t. record batch size and overall size]).
   
   It may be preferable not to diverge from the local approach and read from all the remote partitions found in the `fetchInfos`. Then, a different read pattern which provides greater performance for a specific operational environment and workload could be enforced via a configuration property. 



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1273,17 +1328,45 @@ class ReplicaManager(val config: KafkaConfig,
                  _: FencedLeaderEpochException |
                  _: ReplicaNotAvailableException |
                  _: KafkaStorageException |
-                 _: OffsetOutOfRangeException |
                  _: InconsistentTopicIdException) =>
-          LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
-            divergingEpoch = None,
-            highWatermark = UnifiedLog.UnknownOffset,
-            leaderLogStartOffset = UnifiedLog.UnknownOffset,
-            leaderLogEndOffset = UnifiedLog.UnknownOffset,
-            followerLogStartOffset = UnifiedLog.UnknownOffset,
-            fetchTimeMs = -1L,
-            lastStableOffset = None,
-            exception = Some(e))
+          createLogReadResult(e)
+        case e: OffsetOutOfRangeException =>

Review Comment:
   I assume eventually we won't use exception-based branch control and not rely on this exception to be re-directed to the remote read code path?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org