You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "satishd (via GitHub)" <gi...@apache.org> on 2023/04/18 16:09:20 UTC

[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.

satishd commented on code in PR #13535:
URL: https://github.com/apache/kafka/pull/13535#discussion_r1168261768


##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1243,6 +1327,33 @@ class ReplicaManager(val config: KafkaConfig,
     result
   }
 
+  def createLogReadResult(highWatermark: Long,

Review Comment:
   `createLogReadResult(e: Throwable)` can not be private as it is used in `DelayedRemoteFetch`. But this method can be used. It is going to be used in test classes that we are going to add in this PR or followup PR. 



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
         }
     }
 
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();

Review Comment:
   afaik, `lastFetchedEpoch` is the epoch of the last fetched record. That can be different from the fetch offset’s epoch. We should find the respective epoch for the target offset and use that to find the remote log segment metadata.
   



##########
core/src/main/java/kafka/log/remote/RemoteLogReader.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;

Review Comment:
   `RemoteLogReader` can not be moved to storage module as it currently depends on `RemoteLogManager`. I will move along with `RemoteLogManager` later. 
   `RemoteLogReadResult` and `RemoteStorageThreadPool` are moved to storage module. 



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1083,48 +1095,100 @@ class ReplicaManager(val config: KafkaConfig,
           fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
         })
       }
-      val delayedFetch = new DelayedFetch(
-        params = params,
-        fetchPartitionStatus = fetchPartitionStatus,
-        replicaManager = this,
-        quota = quota,
-        responseCallback = responseCallback
-      )
-
-      // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
-      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
-
-      // try to complete the request immediately, otherwise put it into the purgatory;
-      // this is because while the delayed fetch operation is being created, new requests
-      // may arrive and hence make this operation completable.
-      delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
+
+      if (remoteFetchInfo.isPresent) {

Review Comment:
   I did not understand the comment here.  



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -600,6 +622,176 @@ public String toString() {
         }
     }
 
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadata = epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadata.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not exist in remote tier.");
+        }
+
+        int startPos = lookupPositionForOffset(rlsMetadata.get(), offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is greater than or equal to the target offset
+            remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos);
+            RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty());
+
+            int updatedFetchSize =
+                    remoteStorageFetchInfo.minOneMessage && firstBatch.sizeInBytes() > maxBytes
+                            ? firstBatch.sizeInBytes() : maxBytes;
+
+            ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+            int remainingBytes = updatedFetchSize;
+
+            firstBatch.writeTo(buffer);
+            remainingBytes -= firstBatch.sizeInBytes();
+
+            if (remainingBytes > 0) {
+                // input stream is read till (startPos - 1) while getting the batch of records earlier.
+                // read the input stream until min of (EOF stream or buffer's remaining capacity).
+                Utils.readFully(remoteSegInputStream, buffer);
+            }
+            buffer.flip();
+
+            FetchDataInfo fetchDataInfo = new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer));
+            if (includeAbortedTxns) {
+                fetchDataInfo = addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(), fetchDataInfo);
+            }
+
+            return fetchDataInfo;
+        } finally {
+            Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream");
+        }
+    }
+
+    private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
+        return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+    }
+
+    private FetchDataInfo addAbortedTransactions(long startOffset,
+                                         RemoteLogSegmentMetadata segmentMetadata,
+                                         FetchDataInfo fetchInfo) throws RemoteStorageException {
+        int fetchSize = fetchInfo.records.sizeInBytes();
+        OffsetPosition startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+                fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+        OffsetIndex offsetIndex = indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+        long upperBoundOffset = offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+                .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+        final List<FetchResponseData.AbortedTransaction> abortedTransactions = new ArrayList<>();
+
+        Consumer<List<AbortedTxn>> accumulator =
+                abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+                        .map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+
+        collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator);
+
+        return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+                fetchInfo.records,
+                fetchInfo.firstEntryIncomplete,
+                Optional.of(abortedTransactions));
+    }
+
+    private void collectAbortedTransactions(long startOffset,
+                                    long upperBoundOffset,
+                                    RemoteLogSegmentMetadata segmentMetadata,
+                                    Consumer<List<AbortedTxn>> accumulator) throws RemoteStorageException {
+        TopicPartition topicPartition = segmentMetadata.topicIdPartition().topicPartition();
+        Iterator<LogSegment> localLogSegments = fetchLog.apply(topicPartition)
+                .map(log -> JavaConverters.asJavaCollection(log.logSegments()))
+                .map(Collection::iterator)
+                .orElse(Collections.emptyIterator());
+
+        boolean searchInLocalLog = false;
+        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = Optional.of(segmentMetadata);
+        Optional<TransactionIndex> txnIndexOpt = nextSegmentMetadataOpt.map(metadata -> indexCache.getIndexEntry(metadata).txnIndex());
+
+        while (txnIndexOpt.isPresent()) {
+            TxnIndexSearchResult searchResult = txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+            accumulator.accept(searchResult.abortedTransactions);
+            if (!searchResult.isComplete) {
+                if (!searchInLocalLog) {
+                    nextSegmentMetadataOpt = findNextSegmentMetadata(nextSegmentMetadataOpt.get());
+
+                    txnIndexOpt = nextSegmentMetadataOpt.map(x -> indexCache.getIndexEntry(x).txnIndex());
+                    if (!txnIndexOpt.isPresent()) {
+                        searchInLocalLog = true;
+                    }
+                }
+
+                if (searchInLocalLog) {
+                    txnIndexOpt = (localLogSegments.hasNext()) ? Optional.of(localLogSegments.next().txnIndex()) : Optional.empty();

Review Comment:
   Right, it can have duplicates. But consumer already handles the duplicate aborted transactions. Updated the code to remove duplicates incase any consumer implementation can not handle duplicate aborted transactions. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org