You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/22 17:24:56 UTC

[iceberg] branch master updated: Spark 3.3: support rate limit in Spark Streaming (#4479)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 029622b420 Spark 3.3: support rate limit in Spark Streaming (#4479)
029622b420 is described below

commit 029622b42091a17ca386977dba4a68f511ffba21
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Sat Apr 22 10:24:49 2023 -0700

    Spark 3.3: support rate limit in Spark Streaming (#4479)
---
 .../main/java/org/apache/iceberg/MicroBatches.java | 209 ++++++++++++---------
 .../org/apache/iceberg/TestMicroBatchBuilder.java  |  20 +-
 .../org/apache/iceberg/spark/SparkReadConf.java    |  16 ++
 .../org/apache/iceberg/spark/SparkReadOptions.java |   7 +
 .../spark/source/SparkMicroBatchStream.java        | 169 +++++++++++++++--
 .../spark/source/TestStructuredStreamingRead3.java |  76 +++++++-
 6 files changed, 384 insertions(+), 113 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/MicroBatches.java b/core/src/main/java/org/apache/iceberg/MicroBatches.java
index e066e1b314..d96246f15b 100644
--- a/core/src/main/java/org/apache/iceberg/MicroBatches.java
+++ b/core/src/main/java/org/apache/iceberg/MicroBatches.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.util.Pair;
 import org.slf4j.Logger;
@@ -36,6 +37,96 @@ import org.slf4j.LoggerFactory;
 public class MicroBatches {
   private MicroBatches() {}
 
+  public static List<Pair<ManifestFile, Integer>> skippedManifestIndexesFromSnapshot(
+      FileIO io, Snapshot snapshot, long startFileIndex, boolean scanAllFiles) {
+    List<ManifestFile> manifests =
+        scanAllFiles
+            ? snapshot.dataManifests(io)
+            : snapshot.dataManifests(io).stream()
+                .filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
+                .collect(Collectors.toList());
+
+    List<Pair<ManifestFile, Integer>> manifestIndexes = indexManifests(manifests);
+
+    return skipManifests(manifestIndexes, startFileIndex);
+  }
+
+  public static CloseableIterable<FileScanTask> openManifestFile(
+      FileIO io,
+      Map<Integer, PartitionSpec> specsById,
+      boolean caseSensitive,
+      Snapshot snapshot,
+      ManifestFile manifestFile,
+      boolean scanAllFiles) {
+
+    ManifestGroup manifestGroup =
+        new ManifestGroup(io, ImmutableList.of(manifestFile))
+            .specsById(specsById)
+            .caseSensitive(caseSensitive);
+    if (!scanAllFiles) {
+      manifestGroup =
+          manifestGroup
+              .filterManifestEntries(
+                  entry ->
+                      entry.snapshotId() == snapshot.snapshotId()
+                          && entry.status() == ManifestEntry.Status.ADDED)
+              .ignoreDeleted();
+    }
+
+    return manifestGroup.planFiles();
+  }
+
+  /**
+   * Method to index the data files for each manifest. For example, if manifest m1 has 3 data files,
+   * manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2,
+   * 3), (m3, 5).
+   *
+   * @param manifestFiles List of input manifests used to index.
+   * @return a list pairing each manifest with the index number of the first data file entry in that
+   *     manifest.
+   */
+  private static List<Pair<ManifestFile, Integer>> indexManifests(
+      List<ManifestFile> manifestFiles) {
+    int currentFileIndex = 0;
+    List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList();
+
+    for (ManifestFile manifest : manifestFiles) {
+      manifestIndexes.add(Pair.of(manifest, currentFileIndex));
+      currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount();
+    }
+
+    return manifestIndexes;
+  }
+
+  /**
+   * Method to skip the manifest file whose index is smaller than startFileIndex. For example, if
+   * the index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the returned
+   * manifest index list is: (m2, 3), (m3, 5).
+   *
+   * @param indexedManifests List of input manifests.
+   * @param startFileIndex Index used to skip all manifests with an index less than or equal to this
+   *     value.
+   * @return a sub-list of manifest file index which only contains the manifest indexes larger than
+   *     the startFileIndex.
+   */
+  private static List<Pair<ManifestFile, Integer>> skipManifests(
+      List<Pair<ManifestFile, Integer>> indexedManifests, long startFileIndex) {
+    if (startFileIndex == 0) {
+      return indexedManifests;
+    }
+
+    int manifestIndex = 0;
+    for (Pair<ManifestFile, Integer> manifest : indexedManifests) {
+      if (manifest.second() > startFileIndex) {
+        break;
+      }
+
+      manifestIndex++;
+    }
+
+    return indexedManifests.subList(Math.max(manifestIndex - 1, 0), indexedManifests.size());
+  }
+
   public static class MicroBatch {
     private final long snapshotId;
     private final long startFileIndex;
@@ -113,73 +204,27 @@ public class MicroBatches {
     }
 
     public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
+      return generate(
+          startFileIndex,
+          Iterables.size(snapshot.addedDataFiles(io)),
+          targetSizeInBytes,
+          scanAllFiles);
+    }
+
+    public MicroBatch generate(
+        long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
+      Preconditions.checkArgument(endFileIndex >= 0, "endFileIndex is unexpectedly smaller than 0");
       Preconditions.checkArgument(
           startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0");
       Preconditions.checkArgument(
           targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0");
 
-      List<ManifestFile> manifests =
-          scanAllFiles
-              ? snapshot.dataManifests(io)
-              : snapshot.dataManifests(io).stream()
-                  .filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
-                  .collect(Collectors.toList());
-
-      List<Pair<ManifestFile, Integer>> manifestIndexes = indexManifests(manifests);
-      List<Pair<ManifestFile, Integer>> skippedManifestIndexes =
-          skipManifests(manifestIndexes, startFileIndex);
-
       return generateMicroBatch(
-          skippedManifestIndexes, startFileIndex, targetSizeInBytes, scanAllFiles);
-    }
-
-    /**
-     * Method to index the data files for each manifest. For example, if manifest m1 has 3 data
-     * files, manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1,
-     * 0), (m2, 3), (m3, 5).
-     *
-     * @param manifestFiles List of input manifests used to index.
-     * @return a list of manifest index with key as manifest file, value as file counts.
-     */
-    private static List<Pair<ManifestFile, Integer>> indexManifests(
-        List<ManifestFile> manifestFiles) {
-      int currentFileIndex = 0;
-      List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList();
-
-      for (ManifestFile manifest : manifestFiles) {
-        manifestIndexes.add(Pair.of(manifest, currentFileIndex));
-        currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount();
-      }
-
-      return manifestIndexes;
-    }
-
-    /**
-     * Method to skip the manifest file in which the index is smaller than startFileIndex. For
-     * example, if the index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the
-     * returned manifest index list is: (m2, 3), (m3, 5).
-     *
-     * @param indexedManifests List of input manifests.
-     * @param startFileIndex Index used to skip the processed manifests.
-     * @return a sub-list of manifest file index which only contains the manifest indexes larger
-     *     than the startFileIndex.
-     */
-    private static List<Pair<ManifestFile, Integer>> skipManifests(
-        List<Pair<ManifestFile, Integer>> indexedManifests, long startFileIndex) {
-      if (startFileIndex == 0) {
-        return indexedManifests;
-      }
-
-      int manifestIndex = 0;
-      for (Pair<ManifestFile, Integer> manifest : indexedManifests) {
-        if (manifest.second() > startFileIndex) {
-          break;
-        }
-
-        manifestIndex++;
-      }
-
-      return indexedManifests.subList(manifestIndex - 1, indexedManifests.size());
+          skippedManifestIndexesFromSnapshot(io, snapshot, startFileIndex, scanAllFiles),
+          startFileIndex,
+          endFileIndex,
+          targetSizeInBytes,
+          scanAllFiles);
     }
 
     /**
@@ -188,25 +233,23 @@ public class MicroBatches {
      *
      * @param indexedManifests A list of indexed manifests to generate MicroBatch
      * @param startFileIndex A startFileIndex used to skip processed files.
+     * @param endFileIndex An endFileIndex used to find files to include, exclusive.
      * @param targetSizeInBytes Used to control the size of MicroBatch, the processed file bytes
      *     must be smaller than this size.
      * @param scanAllFiles Used to check whether all the data files should be processed, or only
      *     added files.
      * @return A MicroBatch.
      */
+    @SuppressWarnings("checkstyle:CyclomaticComplexity")
     private MicroBatch generateMicroBatch(
         List<Pair<ManifestFile, Integer>> indexedManifests,
         long startFileIndex,
+        long endFileIndex,
         long targetSizeInBytes,
         boolean scanAllFiles) {
       if (indexedManifests.isEmpty()) {
         return new MicroBatch(
-            snapshot.snapshotId(),
-            startFileIndex,
-            startFileIndex + 1,
-            0L,
-            Collections.emptyList(),
-            true);
+            snapshot.snapshotId(), startFileIndex, endFileIndex, 0L, Collections.emptyList(), true);
       }
 
       long currentSizeInBytes = 0L;
@@ -218,11 +261,18 @@ public class MicroBatches {
         currentFileIndex = indexedManifests.get(idx).second();
 
         try (CloseableIterable<FileScanTask> taskIterable =
-                open(indexedManifests.get(idx).first(), scanAllFiles);
+                openManifestFile(
+                    io,
+                    specsById,
+                    caseSensitive,
+                    snapshot,
+                    indexedManifests.get(idx).first(),
+                    scanAllFiles);
             CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
           while (taskIter.hasNext()) {
             FileScanTask task = taskIter.next();
-            if (currentFileIndex >= startFileIndex) {
+            // want to read [startFileIndex ... endFileIndex)
+            if (currentFileIndex >= startFileIndex && currentFileIndex < endFileIndex) {
               // Make sure there's at least one task in each MicroBatch to void job to be stuck,
               // always add task
               // firstly.
@@ -231,7 +281,7 @@ public class MicroBatches {
             }
 
             currentFileIndex++;
-            if (currentSizeInBytes >= targetSizeInBytes) {
+            if (currentSizeInBytes >= targetSizeInBytes || currentFileIndex >= endFileIndex) {
               break;
             }
           }
@@ -259,6 +309,7 @@ public class MicroBatches {
         }
       }
 
+      // [startFileIndex ....currentFileIndex)
       return new MicroBatch(
           snapshot.snapshotId(),
           startFileIndex,
@@ -267,23 +318,5 @@ public class MicroBatches {
           tasks,
           isLastIndex);
     }
-
-    private CloseableIterable<FileScanTask> open(ManifestFile manifestFile, boolean scanAllFiles) {
-      ManifestGroup manifestGroup =
-          new ManifestGroup(io, ImmutableList.of(manifestFile))
-              .specsById(specsById)
-              .caseSensitive(caseSensitive);
-      if (!scanAllFiles) {
-        manifestGroup =
-            manifestGroup
-                .filterManifestEntries(
-                    entry ->
-                        entry.snapshotId() == snapshot.snapshotId()
-                            && entry.status() == ManifestEntry.Status.ADDED)
-                .ignoreDeleted();
-      }
-
-      return manifestGroup.planFiles();
-    }
   }
 }
diff --git a/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java b/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
index b907a50319..deb6e7c8ad 100644
--- a/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
+++ b/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
@@ -52,7 +52,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
     MicroBatch batch =
         MicroBatches.from(table.snapshot(1L), table.io())
             .specsById(table.specs())
-            .generate(0, Long.MAX_VALUE, true);
+            .generate(0, 6, Long.MAX_VALUE, true);
     Assert.assertEquals(batch.snapshotId(), 1L);
     Assert.assertEquals(batch.startFileIndex(), 0);
     Assert.assertEquals(batch.endFileIndex(), 5);
@@ -63,7 +63,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
     MicroBatch batch1 =
         MicroBatches.from(table.snapshot(1L), table.io())
             .specsById(table.specs())
-            .generate(0, 15L, true);
+            .generate(0, 1, 15L, true);
     Assert.assertEquals(batch1.endFileIndex(), 1);
     Assert.assertEquals(batch1.sizeInBytes(), 10);
     Assert.assertFalse(batch1.lastIndexOfSnapshot());
@@ -72,7 +72,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
     MicroBatch batch2 =
         MicroBatches.from(table.snapshot(1L), table.io())
             .specsById(table.specs())
-            .generate(batch1.endFileIndex(), 30L, true);
+            .generate(batch1.endFileIndex(), 4, 30L, true);
     Assert.assertEquals(batch2.endFileIndex(), 4);
     Assert.assertEquals(batch2.sizeInBytes(), 30);
     Assert.assertFalse(batch2.lastIndexOfSnapshot());
@@ -81,7 +81,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
     MicroBatch batch3 =
         MicroBatches.from(table.snapshot(1L), table.io())
             .specsById(table.specs())
-            .generate(batch2.endFileIndex(), 50L, true);
+            .generate(batch2.endFileIndex(), 5, 50L, true);
     Assert.assertEquals(batch3.endFileIndex(), 5);
     Assert.assertEquals(batch3.sizeInBytes(), 10);
     Assert.assertTrue(batch3.lastIndexOfSnapshot());
@@ -95,7 +95,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
     MicroBatch batch =
         MicroBatches.from(table.snapshot(1L), table.io())
             .specsById(table.specs())
-            .generate(0, 10L, true);
+            .generate(0, 1, 10L, true);
     Assert.assertEquals(batch.snapshotId(), 1L);
     Assert.assertEquals(batch.startFileIndex(), 0);
     Assert.assertEquals(batch.endFileIndex(), 1);
@@ -106,7 +106,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
     MicroBatch batch1 =
         MicroBatches.from(table.snapshot(1L), table.io())
             .specsById(table.specs())
-            .generate(batch.endFileIndex(), 5L, true);
+            .generate(batch.endFileIndex(), 2, 5L, true);
     Assert.assertEquals(batch1.endFileIndex(), 2);
     Assert.assertEquals(batch1.sizeInBytes(), 10);
     filesMatch(Lists.newArrayList("B"), filesToScan(batch1.tasks()));
@@ -115,7 +115,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
     MicroBatch batch2 =
         MicroBatches.from(table.snapshot(1L), table.io())
             .specsById(table.specs())
-            .generate(batch1.endFileIndex(), 10L, true);
+            .generate(batch1.endFileIndex(), 3, 10L, true);
     Assert.assertEquals(batch2.endFileIndex(), 3);
     Assert.assertEquals(batch2.sizeInBytes(), 10);
     filesMatch(Lists.newArrayList("C"), filesToScan(batch2.tasks()));
@@ -124,7 +124,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
     MicroBatch batch3 =
         MicroBatches.from(table.snapshot(1L), table.io())
             .specsById(table.specs())
-            .generate(batch2.endFileIndex(), 10L, true);
+            .generate(batch2.endFileIndex(), 4, 10L, true);
     Assert.assertEquals(batch3.endFileIndex(), 4);
     Assert.assertEquals(batch3.sizeInBytes(), 10);
     filesMatch(Lists.newArrayList("D"), filesToScan(batch3.tasks()));
@@ -133,7 +133,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
     MicroBatch batch4 =
         MicroBatches.from(table.snapshot(1L), table.io())
             .specsById(table.specs())
-            .generate(batch3.endFileIndex(), 5L, true);
+            .generate(batch3.endFileIndex(), 5, 5L, true);
     Assert.assertEquals(batch4.endFileIndex(), 5);
     Assert.assertEquals(batch4.sizeInBytes(), 10);
     filesMatch(Lists.newArrayList("E"), filesToScan(batch4.tasks()));
@@ -142,7 +142,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
     MicroBatch batch5 =
         MicroBatches.from(table.snapshot(1L), table.io())
             .specsById(table.specs())
-            .generate(batch4.endFileIndex(), 5L, true);
+            .generate(batch4.endFileIndex(), 5, 5L, true);
     Assert.assertEquals(batch5.endFileIndex(), 5);
     Assert.assertEquals(batch5.sizeInBytes(), 0);
     Assert.assertTrue(Iterables.isEmpty(batch5.tasks()));
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 1c1182c4da..dbd2613dde 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -255,6 +255,22 @@ public class SparkReadConf {
     return confParser.longConf().option(SparkReadOptions.END_TIMESTAMP).parseOptional();
   }
 
+  public Integer maxFilesPerMicroBatch() {
+    return confParser
+        .intConf()
+        .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH)
+        .defaultValue(Integer.MAX_VALUE)
+        .parse();
+  }
+
+  public Integer maxRecordsPerMicroBatch() {
+    return confParser
+        .intConf()
+        .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH)
+        .defaultValue(Integer.MAX_VALUE)
+        .parse();
+  }
+
   public boolean preserveDataGrouping() {
     return confParser
         .booleanConf()
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index 9063e0f9ab..80d60cf872 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -84,6 +84,13 @@ public class SparkReadOptions {
   // Timestamp in milliseconds; start a stream from the snapshot that occurs after this timestamp
   public static final String STREAM_FROM_TIMESTAMP = "stream-from-timestamp";
 
+  // maximum file per micro_batch
+  public static final String STREAMING_MAX_FILES_PER_MICRO_BATCH =
+      "streaming-max-files-per-micro-batch";
+  // maximum rows per micro_batch
+  public static final String STREAMING_MAX_ROWS_PER_MICRO_BATCH =
+      "streaming-max-rows-per-micro-batch";
+
   // Table path
   public static final String PATH = "path";
 
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 6e03dd69a8..bb419e9951 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -30,6 +30,7 @@ import java.util.Locale;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataOperations;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.MicroBatches;
 import org.apache.iceberg.MicroBatches.MicroBatch;
 import org.apache.iceberg.Schema;
@@ -38,6 +39,7 @@ import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
@@ -48,6 +50,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.TableScanUtil;
@@ -59,10 +62,12 @@ import org.apache.spark.sql.connector.read.InputPartition;
 import org.apache.spark.sql.connector.read.PartitionReaderFactory;
 import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
 import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class SparkMicroBatchStream implements MicroBatchStream {
+public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissionControl {
   private static final Joiner SLASH = Joiner.on("/");
   private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class);
   private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = Types.StructType.of();
@@ -80,6 +85,8 @@ public class SparkMicroBatchStream implements MicroBatchStream {
   private final boolean skipDelete;
   private final boolean skipOverwrite;
   private final Long fromTimestamp;
+  private final Integer maxFilesPerMicroBatch;
+  private final Integer maxRecordsPerMicroBatch;
 
   SparkMicroBatchStream(
       JavaSparkContext sparkContext,
@@ -97,6 +104,8 @@ public class SparkMicroBatchStream implements MicroBatchStream {
     this.splitLookback = readConf.splitLookback();
     this.splitOpenFileCost = readConf.splitOpenFileCost();
     this.fromTimestamp = readConf.streamFromTimestamp();
+    this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch();
+    this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch();
 
     InitialOffsetStore initialOffsetStore =
         new InitialOffsetStore(table, checkpointLocation, fromTimestamp);
@@ -118,16 +127,8 @@ public class SparkMicroBatchStream implements MicroBatchStream {
     }
 
     Snapshot latestSnapshot = table.currentSnapshot();
-    long addedFilesCount =
-        PropertyUtil.propertyAsLong(latestSnapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1);
-    // if the latest snapshot summary doesn't contain SnapshotSummary.ADDED_FILES_PROP,
-    // iterate through addedDataFiles to compute addedFilesCount
-    addedFilesCount =
-        addedFilesCount == -1
-            ? Iterables.size(latestSnapshot.addedDataFiles(table.io()))
-            : addedFilesCount;
-
-    return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount, false);
+
+    return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount(latestSnapshot), false);
   }
 
   @Override
@@ -204,12 +205,20 @@ public class SparkMicroBatchStream implements MicroBatchStream {
 
     StreamingOffset currentOffset = null;
 
+    // [(startOffset : startFileIndex), (endOffset : endFileIndex) )
     do {
+      long endFileIndex;
       if (currentOffset == null) {
         currentOffset = batchStartOffset;
       } else {
         Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId());
-        currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);
+        // it may happen that we need to read this snapshot partially in case it's equal to
+        // endOffset.
+        if (currentOffset.snapshotId() != endOffset.snapshotId()) {
+          currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);
+        } else {
+          currentOffset = endOffset;
+        }
       }
 
       Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
@@ -226,12 +235,22 @@ public class SparkMicroBatchStream implements MicroBatchStream {
         continue;
       }
 
+      Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());
+      if (currentOffset.snapshotId() == endOffset.snapshotId()) {
+        endFileIndex = endOffset.position();
+      } else {
+        endFileIndex = addedFilesCount(currentSnapshot);
+      }
+
       MicroBatch latestMicroBatch =
-          MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
+          MicroBatches.from(currentSnapshot, table.io())
               .caseSensitive(caseSensitive)
               .specsById(table.specs())
               .generate(
-                  currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles());
+                  currentOffset.position(),
+                  endFileIndex,
+                  Long.MAX_VALUE,
+                  currentOffset.shouldScanAllFiles());
 
       fileScanTasks.addAll(latestMicroBatch.tasks());
     } while (currentOffset.snapshotId() != endOffset.snapshotId());
@@ -295,6 +314,128 @@ public class SparkMicroBatchStream implements MicroBatchStream {
     }
   }
 
+  @Override
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  public Offset latestOffset(Offset startOffset, ReadLimit limit) {
+    // calculate end offset get snapshotId from the startOffset
+    Preconditions.checkArgument(
+        startOffset instanceof StreamingOffset,
+        "Invalid start offset: %s is not a StreamingOffset",
+        startOffset);
+
+    table.refresh();
+    if (table.currentSnapshot() == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // end offset can expand to multiple snapshots
+    StreamingOffset startingOffset = (StreamingOffset) startOffset;
+
+    if (startOffset.equals(StreamingOffset.START_OFFSET)) {
+      startingOffset = determineStartingOffset(table, fromTimestamp);
+    }
+
+    Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
+    int startPosOfSnapOffset = (int) startingOffset.position();
+
+    boolean scanAllFiles = startingOffset.shouldScanAllFiles();
+
+    boolean shouldContinueReading = true;
+    int curFilesAdded = 0;
+    int curRecordCount = 0;
+    int curPos = 0;
+
+    // Note : we produce nextOffset with pos as non-inclusive
+    while (shouldContinueReading) {
+      // generate manifest index for the curSnapshot
+      List<Pair<ManifestFile, Integer>> indexedManifests =
+          MicroBatches.skippedManifestIndexesFromSnapshot(
+              table.io(), curSnapshot, startPosOfSnapOffset, scanAllFiles);
+      // this is under assumption we will be able to add at-least 1 file in the new offset
+      for (int idx = 0; idx < indexedManifests.size() && shouldContinueReading; idx++) {
+        // be rest assured curPos >= startFileIndex
+        curPos = indexedManifests.get(idx).second();
+        try (CloseableIterable<FileScanTask> taskIterable =
+                MicroBatches.openManifestFile(
+                    table.io(),
+                    table.specs(),
+                    caseSensitive,
+                    curSnapshot,
+                    indexedManifests.get(idx).first(),
+                    scanAllFiles);
+            CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
+          while (taskIter.hasNext()) {
+            FileScanTask task = taskIter.next();
+            if (curPos >= startPosOfSnapOffset) {
+              // TODO : use readLimit provided in function param, the readLimits are derived from
+              // these 2 properties.
+              if ((curFilesAdded + 1) > maxFilesPerMicroBatch
+                  || (curRecordCount + task.file().recordCount()) > maxRecordsPerMicroBatch) {
+                shouldContinueReading = false;
+                break;
+              }
+
+              curFilesAdded += 1;
+              curRecordCount += task.file().recordCount();
+            }
+            ++curPos;
+          }
+        } catch (IOException ioe) {
+          LOG.warn("Failed to close task iterable", ioe);
+        }
+      }
+      // if the currentSnapShot was also the mostRecentSnapshot then break
+      if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+        break;
+      }
+
+      // if everything was OK and we consumed complete snapshot then move to next snapshot
+      if (shouldContinueReading) {
+        startPosOfSnapOffset = -1;
+        curSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId());
+        // if anyhow we are moving to next snapshot we should only scan addedFiles
+        scanAllFiles = false;
+      }
+    }
+
+    StreamingOffset latestStreamingOffset =
+        new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles);
+
+    // if no new data arrived, then return null.
+    return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset;
+  }
+
+  private long addedFilesCount(Snapshot snapshot) {
+    long addedFilesCount =
+        PropertyUtil.propertyAsLong(snapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1);
+    // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP,
+    // iterate through addedFiles iterator to find addedFilesCount.
+    return addedFilesCount == -1
+        ? Iterables.size(snapshot.addedDataFiles(table.io()))
+        : addedFilesCount;
+  }
+
+  @Override
+  public ReadLimit getDefaultReadLimit() {
+    if (maxFilesPerMicroBatch != Integer.MAX_VALUE
+        && maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
+      ReadLimit[] readLimits = new ReadLimit[2];
+      readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch);
+      readLimits[1] = ReadLimit.maxRows(maxFilesPerMicroBatch);
+      return ReadLimit.compositeLimit(readLimits);
+    } else if (maxFilesPerMicroBatch != Integer.MAX_VALUE) {
+      return ReadLimit.maxFiles(maxFilesPerMicroBatch);
+    } else if (maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
+      return ReadLimit.maxRows(maxRecordsPerMicroBatch);
+    } else {
+      return ReadLimit.allAvailable();
+    }
+  }
+
   private static class InitialOffsetStore {
     private final Table table;
     private final FileIO io;
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index dd456f2237..a2d0c9acaf 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.BaseTable;
@@ -47,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkCatalogTestBase;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.spark.api.java.function.VoidFunction2;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -70,6 +72,8 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
 
   private Table table;
 
+  private final AtomicInteger microBatches = new AtomicInteger();
+
   /**
    * test data to be used by multiple writes each write creates a snapshot and writes a list of
    * records
@@ -115,6 +119,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
             + "PARTITIONED BY (bucket(3, id))",
         tableName);
     this.table = validationCatalog.loadTable(tableIdent);
+    microBatches.set(0);
   }
 
   @After
@@ -140,6 +145,57 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1()
+      throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+    Assert.assertEquals(
+        6,
+        microBatchCount(
+            ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")));
+  }
+
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2()
+      throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+    Assert.assertEquals(
+        3,
+        microBatchCount(
+            ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2")));
+  }
+
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1()
+      throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+    // only 1 micro-batch will be formed and we will read data partially
+    Assert.assertEquals(
+        1,
+        microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1")));
+
+    StreamingQuery query = startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1");
+
+    // check answer correctness only 1 record read the micro-batch will be stuck
+    List<SimpleRecord> actual = rowsAvailable(query);
+    Assertions.assertThat(actual)
+        .containsExactlyInAnyOrderElementsOf(
+            Lists.newArrayList(TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0)));
+  }
+
+  @Test
+  public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4()
+      throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+    Assert.assertEquals(
+        2,
+        microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")));
+  }
+
   @Test
   public void testReadStreamOnIcebergThenAddData() throws Exception {
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
@@ -561,7 +617,25 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
   }
 
   private StreamingQuery startStream(String key, String value) throws TimeoutException {
-    return startStream(ImmutableMap.of(key, value));
+    return startStream(
+        ImmutableMap.of(key, value, SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"));
+  }
+
+  private int microBatchCount(Map<String, String> options) throws TimeoutException {
+    Dataset<Row> ds = spark.readStream().options(options).format("iceberg").load(tableName);
+
+    ds.writeStream()
+        .options(options)
+        .foreachBatch(
+            (VoidFunction2<Dataset<Row>, Long>)
+                (dataset, batchId) -> {
+                  microBatches.getAndIncrement();
+                })
+        .start()
+        .processAllAvailable();
+
+    stopStreams();
+    return microBatches.get();
   }
 
   private List<SimpleRecord> rowsAvailable(StreamingQuery query) {