You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/22 17:24:56 UTC
[iceberg] branch master updated: Spark 3.3: support rate limit in Spark Streaming (#4479)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 029622b420 Spark 3.3: support rate limit in Spark Streaming (#4479)
029622b420 is described below
commit 029622b42091a17ca386977dba4a68f511ffba21
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Sat Apr 22 10:24:49 2023 -0700
Spark 3.3: support rate limit in Spark Streaming (#4479)
---
.../main/java/org/apache/iceberg/MicroBatches.java | 209 ++++++++++++---------
.../org/apache/iceberg/TestMicroBatchBuilder.java | 20 +-
.../org/apache/iceberg/spark/SparkReadConf.java | 16 ++
.../org/apache/iceberg/spark/SparkReadOptions.java | 7 +
.../spark/source/SparkMicroBatchStream.java | 169 +++++++++++++++--
.../spark/source/TestStructuredStreamingRead3.java | 76 +++++++-
6 files changed, 384 insertions(+), 113 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/MicroBatches.java b/core/src/main/java/org/apache/iceberg/MicroBatches.java
index e066e1b314..d96246f15b 100644
--- a/core/src/main/java/org/apache/iceberg/MicroBatches.java
+++ b/core/src/main/java/org/apache/iceberg/MicroBatches.java
@@ -28,6 +28,7 @@ import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
@@ -36,6 +37,96 @@ import org.slf4j.LoggerFactory;
public class MicroBatches {
private MicroBatches() {}
+ public static List<Pair<ManifestFile, Integer>> skippedManifestIndexesFromSnapshot(
+ FileIO io, Snapshot snapshot, long startFileIndex, boolean scanAllFiles) {
+ List<ManifestFile> manifests =
+ scanAllFiles
+ ? snapshot.dataManifests(io)
+ : snapshot.dataManifests(io).stream()
+ .filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
+ .collect(Collectors.toList());
+
+ List<Pair<ManifestFile, Integer>> manifestIndexes = indexManifests(manifests);
+
+ return skipManifests(manifestIndexes, startFileIndex);
+ }
+
+ public static CloseableIterable<FileScanTask> openManifestFile(
+ FileIO io,
+ Map<Integer, PartitionSpec> specsById,
+ boolean caseSensitive,
+ Snapshot snapshot,
+ ManifestFile manifestFile,
+ boolean scanAllFiles) {
+
+ ManifestGroup manifestGroup =
+ new ManifestGroup(io, ImmutableList.of(manifestFile))
+ .specsById(specsById)
+ .caseSensitive(caseSensitive);
+ if (!scanAllFiles) {
+ manifestGroup =
+ manifestGroup
+ .filterManifestEntries(
+ entry ->
+ entry.snapshotId() == snapshot.snapshotId()
+ && entry.status() == ManifestEntry.Status.ADDED)
+ .ignoreDeleted();
+ }
+
+ return manifestGroup.planFiles();
+ }
+
+ /**
+ * Method to index the data files for each manifest. For example, if manifest m1 has 3 data files,
+ * manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1, 0), (m2,
+ * 3), (m3, 5).
+ *
+ * @param manifestFiles List of input manifests used to index.
+ * @return a list pairing each manifest with the index number of the first data file entry in that
+ * manifest.
+ */
+ private static List<Pair<ManifestFile, Integer>> indexManifests(
+ List<ManifestFile> manifestFiles) {
+ int currentFileIndex = 0;
+ List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList();
+
+ for (ManifestFile manifest : manifestFiles) {
+ manifestIndexes.add(Pair.of(manifest, currentFileIndex));
+ currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount();
+ }
+
+ return manifestIndexes;
+ }
+
+ /**
+ * Method to skip the manifest file whose index is smaller than startFileIndex. For example, if
+ * the index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the returned
+ * manifest index list is: (m2, 3), (m3, 5).
+ *
+ * @param indexedManifests List of input manifests.
+ * @param startFileIndex Index used to skip all manifests with an index less than or equal to this
+ * value.
+ * @return a sub-list of manifest file index which only contains the manifest indexes larger than
+ * the startFileIndex.
+ */
+ private static List<Pair<ManifestFile, Integer>> skipManifests(
+ List<Pair<ManifestFile, Integer>> indexedManifests, long startFileIndex) {
+ if (startFileIndex == 0) {
+ return indexedManifests;
+ }
+
+ int manifestIndex = 0;
+ for (Pair<ManifestFile, Integer> manifest : indexedManifests) {
+ if (manifest.second() > startFileIndex) {
+ break;
+ }
+
+ manifestIndex++;
+ }
+
+ return indexedManifests.subList(Math.max(manifestIndex - 1, 0), indexedManifests.size());
+ }
+
public static class MicroBatch {
private final long snapshotId;
private final long startFileIndex;
@@ -113,73 +204,27 @@ public class MicroBatches {
}
public MicroBatch generate(long startFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
+ return generate(
+ startFileIndex,
+ Iterables.size(snapshot.addedDataFiles(io)),
+ targetSizeInBytes,
+ scanAllFiles);
+ }
+
+ public MicroBatch generate(
+ long startFileIndex, long endFileIndex, long targetSizeInBytes, boolean scanAllFiles) {
+ Preconditions.checkArgument(endFileIndex >= 0, "endFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(
startFileIndex >= 0, "startFileIndex is unexpectedly smaller than 0");
Preconditions.checkArgument(
targetSizeInBytes > 0, "targetSizeInBytes should be larger than 0");
- List<ManifestFile> manifests =
- scanAllFiles
- ? snapshot.dataManifests(io)
- : snapshot.dataManifests(io).stream()
- .filter(m -> m.snapshotId().equals(snapshot.snapshotId()))
- .collect(Collectors.toList());
-
- List<Pair<ManifestFile, Integer>> manifestIndexes = indexManifests(manifests);
- List<Pair<ManifestFile, Integer>> skippedManifestIndexes =
- skipManifests(manifestIndexes, startFileIndex);
-
return generateMicroBatch(
- skippedManifestIndexes, startFileIndex, targetSizeInBytes, scanAllFiles);
- }
-
- /**
- * Method to index the data files for each manifest. For example, if manifest m1 has 3 data
- * files, manifest m2 has 2 data files, manifest m3 has 1 data file, then the index will be (m1,
- * 0), (m2, 3), (m3, 5).
- *
- * @param manifestFiles List of input manifests used to index.
- * @return a list of manifest index with key as manifest file, value as file counts.
- */
- private static List<Pair<ManifestFile, Integer>> indexManifests(
- List<ManifestFile> manifestFiles) {
- int currentFileIndex = 0;
- List<Pair<ManifestFile, Integer>> manifestIndexes = Lists.newArrayList();
-
- for (ManifestFile manifest : manifestFiles) {
- manifestIndexes.add(Pair.of(manifest, currentFileIndex));
- currentFileIndex += manifest.addedFilesCount() + manifest.existingFilesCount();
- }
-
- return manifestIndexes;
- }
-
- /**
- * Method to skip the manifest file in which the index is smaller than startFileIndex. For
- * example, if the index list is : (m1, 0), (m2, 3), (m3, 5), and startFileIndex is 4, then the
- * returned manifest index list is: (m2, 3), (m3, 5).
- *
- * @param indexedManifests List of input manifests.
- * @param startFileIndex Index used to skip the processed manifests.
- * @return a sub-list of manifest file index which only contains the manifest indexes larger
- * than the startFileIndex.
- */
- private static List<Pair<ManifestFile, Integer>> skipManifests(
- List<Pair<ManifestFile, Integer>> indexedManifests, long startFileIndex) {
- if (startFileIndex == 0) {
- return indexedManifests;
- }
-
- int manifestIndex = 0;
- for (Pair<ManifestFile, Integer> manifest : indexedManifests) {
- if (manifest.second() > startFileIndex) {
- break;
- }
-
- manifestIndex++;
- }
-
- return indexedManifests.subList(manifestIndex - 1, indexedManifests.size());
+ skippedManifestIndexesFromSnapshot(io, snapshot, startFileIndex, scanAllFiles),
+ startFileIndex,
+ endFileIndex,
+ targetSizeInBytes,
+ scanAllFiles);
}
/**
@@ -188,25 +233,23 @@ public class MicroBatches {
*
* @param indexedManifests A list of indexed manifests to generate MicroBatch
* @param startFileIndex A startFileIndex used to skip processed files.
+ * @param endFileIndex An endFileIndex used to find files to include, exclusive.
* @param targetSizeInBytes Used to control the size of MicroBatch, the processed file bytes
* must be smaller than this size.
* @param scanAllFiles Used to check whether all the data files should be processed, or only
* added files.
* @return A MicroBatch.
*/
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
private MicroBatch generateMicroBatch(
List<Pair<ManifestFile, Integer>> indexedManifests,
long startFileIndex,
+ long endFileIndex,
long targetSizeInBytes,
boolean scanAllFiles) {
if (indexedManifests.isEmpty()) {
return new MicroBatch(
- snapshot.snapshotId(),
- startFileIndex,
- startFileIndex + 1,
- 0L,
- Collections.emptyList(),
- true);
+ snapshot.snapshotId(), startFileIndex, endFileIndex, 0L, Collections.emptyList(), true);
}
long currentSizeInBytes = 0L;
@@ -218,11 +261,18 @@ public class MicroBatches {
currentFileIndex = indexedManifests.get(idx).second();
try (CloseableIterable<FileScanTask> taskIterable =
- open(indexedManifests.get(idx).first(), scanAllFiles);
+ openManifestFile(
+ io,
+ specsById,
+ caseSensitive,
+ snapshot,
+ indexedManifests.get(idx).first(),
+ scanAllFiles);
CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
while (taskIter.hasNext()) {
FileScanTask task = taskIter.next();
- if (currentFileIndex >= startFileIndex) {
+ // want to read [startFileIndex ... endFileIndex)
+ if (currentFileIndex >= startFileIndex && currentFileIndex < endFileIndex) {
// Make sure there's at least one task in each MicroBatch to void job to be stuck,
// always add task
// firstly.
@@ -231,7 +281,7 @@ public class MicroBatches {
}
currentFileIndex++;
- if (currentSizeInBytes >= targetSizeInBytes) {
+ if (currentSizeInBytes >= targetSizeInBytes || currentFileIndex >= endFileIndex) {
break;
}
}
@@ -259,6 +309,7 @@ public class MicroBatches {
}
}
+ // [startFileIndex ....currentFileIndex)
return new MicroBatch(
snapshot.snapshotId(),
startFileIndex,
@@ -267,23 +318,5 @@ public class MicroBatches {
tasks,
isLastIndex);
}
-
- private CloseableIterable<FileScanTask> open(ManifestFile manifestFile, boolean scanAllFiles) {
- ManifestGroup manifestGroup =
- new ManifestGroup(io, ImmutableList.of(manifestFile))
- .specsById(specsById)
- .caseSensitive(caseSensitive);
- if (!scanAllFiles) {
- manifestGroup =
- manifestGroup
- .filterManifestEntries(
- entry ->
- entry.snapshotId() == snapshot.snapshotId()
- && entry.status() == ManifestEntry.Status.ADDED)
- .ignoreDeleted();
- }
-
- return manifestGroup.planFiles();
- }
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java b/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
index b907a50319..deb6e7c8ad 100644
--- a/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
+++ b/core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
@@ -52,7 +52,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
MicroBatch batch =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
- .generate(0, Long.MAX_VALUE, true);
+ .generate(0, 6, Long.MAX_VALUE, true);
Assert.assertEquals(batch.snapshotId(), 1L);
Assert.assertEquals(batch.startFileIndex(), 0);
Assert.assertEquals(batch.endFileIndex(), 5);
@@ -63,7 +63,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
MicroBatch batch1 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
- .generate(0, 15L, true);
+ .generate(0, 1, 15L, true);
Assert.assertEquals(batch1.endFileIndex(), 1);
Assert.assertEquals(batch1.sizeInBytes(), 10);
Assert.assertFalse(batch1.lastIndexOfSnapshot());
@@ -72,7 +72,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
MicroBatch batch2 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
- .generate(batch1.endFileIndex(), 30L, true);
+ .generate(batch1.endFileIndex(), 4, 30L, true);
Assert.assertEquals(batch2.endFileIndex(), 4);
Assert.assertEquals(batch2.sizeInBytes(), 30);
Assert.assertFalse(batch2.lastIndexOfSnapshot());
@@ -81,7 +81,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
MicroBatch batch3 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
- .generate(batch2.endFileIndex(), 50L, true);
+ .generate(batch2.endFileIndex(), 5, 50L, true);
Assert.assertEquals(batch3.endFileIndex(), 5);
Assert.assertEquals(batch3.sizeInBytes(), 10);
Assert.assertTrue(batch3.lastIndexOfSnapshot());
@@ -95,7 +95,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
MicroBatch batch =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
- .generate(0, 10L, true);
+ .generate(0, 1, 10L, true);
Assert.assertEquals(batch.snapshotId(), 1L);
Assert.assertEquals(batch.startFileIndex(), 0);
Assert.assertEquals(batch.endFileIndex(), 1);
@@ -106,7 +106,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
MicroBatch batch1 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
- .generate(batch.endFileIndex(), 5L, true);
+ .generate(batch.endFileIndex(), 2, 5L, true);
Assert.assertEquals(batch1.endFileIndex(), 2);
Assert.assertEquals(batch1.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("B"), filesToScan(batch1.tasks()));
@@ -115,7 +115,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
MicroBatch batch2 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
- .generate(batch1.endFileIndex(), 10L, true);
+ .generate(batch1.endFileIndex(), 3, 10L, true);
Assert.assertEquals(batch2.endFileIndex(), 3);
Assert.assertEquals(batch2.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("C"), filesToScan(batch2.tasks()));
@@ -124,7 +124,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
MicroBatch batch3 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
- .generate(batch2.endFileIndex(), 10L, true);
+ .generate(batch2.endFileIndex(), 4, 10L, true);
Assert.assertEquals(batch3.endFileIndex(), 4);
Assert.assertEquals(batch3.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("D"), filesToScan(batch3.tasks()));
@@ -133,7 +133,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
MicroBatch batch4 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
- .generate(batch3.endFileIndex(), 5L, true);
+ .generate(batch3.endFileIndex(), 5, 5L, true);
Assert.assertEquals(batch4.endFileIndex(), 5);
Assert.assertEquals(batch4.sizeInBytes(), 10);
filesMatch(Lists.newArrayList("E"), filesToScan(batch4.tasks()));
@@ -142,7 +142,7 @@ public class TestMicroBatchBuilder extends TableTestBase {
MicroBatch batch5 =
MicroBatches.from(table.snapshot(1L), table.io())
.specsById(table.specs())
- .generate(batch4.endFileIndex(), 5L, true);
+ .generate(batch4.endFileIndex(), 5, 5L, true);
Assert.assertEquals(batch5.endFileIndex(), 5);
Assert.assertEquals(batch5.sizeInBytes(), 0);
Assert.assertTrue(Iterables.isEmpty(batch5.tasks()));
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 1c1182c4da..dbd2613dde 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -255,6 +255,22 @@ public class SparkReadConf {
return confParser.longConf().option(SparkReadOptions.END_TIMESTAMP).parseOptional();
}
+ public Integer maxFilesPerMicroBatch() {
+ return confParser
+ .intConf()
+ .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH)
+ .defaultValue(Integer.MAX_VALUE)
+ .parse();
+ }
+
+ public Integer maxRecordsPerMicroBatch() {
+ return confParser
+ .intConf()
+ .option(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH)
+ .defaultValue(Integer.MAX_VALUE)
+ .parse();
+ }
+
public boolean preserveDataGrouping() {
return confParser
.booleanConf()
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index 9063e0f9ab..80d60cf872 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -84,6 +84,13 @@ public class SparkReadOptions {
// Timestamp in milliseconds; start a stream from the snapshot that occurs after this timestamp
public static final String STREAM_FROM_TIMESTAMP = "stream-from-timestamp";
+ // maximum file per micro_batch
+ public static final String STREAMING_MAX_FILES_PER_MICRO_BATCH =
+ "streaming-max-files-per-micro-batch";
+ // maximum rows per micro_batch
+ public static final String STREAMING_MAX_ROWS_PER_MICRO_BATCH =
+ "streaming-max-rows-per-micro-batch";
+
// Table path
public static final String PATH = "path";
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 6e03dd69a8..bb419e9951 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -30,6 +30,7 @@ import java.util.Locale;
import org.apache.iceberg.CombinedScanTask;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.MicroBatches;
import org.apache.iceberg.MicroBatches.MicroBatch;
import org.apache.iceberg.Schema;
@@ -38,6 +39,7 @@ import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
@@ -48,6 +50,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
@@ -59,10 +62,12 @@ import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.apache.spark.sql.connector.read.streaming.SupportsAdmissionControl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SparkMicroBatchStream implements MicroBatchStream {
+public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissionControl {
private static final Joiner SLASH = Joiner.on("/");
private static final Logger LOG = LoggerFactory.getLogger(SparkMicroBatchStream.class);
private static final Types.StructType EMPTY_GROUPING_KEY_TYPE = Types.StructType.of();
@@ -80,6 +85,8 @@ public class SparkMicroBatchStream implements MicroBatchStream {
private final boolean skipDelete;
private final boolean skipOverwrite;
private final Long fromTimestamp;
+ private final Integer maxFilesPerMicroBatch;
+ private final Integer maxRecordsPerMicroBatch;
SparkMicroBatchStream(
JavaSparkContext sparkContext,
@@ -97,6 +104,8 @@ public class SparkMicroBatchStream implements MicroBatchStream {
this.splitLookback = readConf.splitLookback();
this.splitOpenFileCost = readConf.splitOpenFileCost();
this.fromTimestamp = readConf.streamFromTimestamp();
+ this.maxFilesPerMicroBatch = readConf.maxFilesPerMicroBatch();
+ this.maxRecordsPerMicroBatch = readConf.maxRecordsPerMicroBatch();
InitialOffsetStore initialOffsetStore =
new InitialOffsetStore(table, checkpointLocation, fromTimestamp);
@@ -118,16 +127,8 @@ public class SparkMicroBatchStream implements MicroBatchStream {
}
Snapshot latestSnapshot = table.currentSnapshot();
- long addedFilesCount =
- PropertyUtil.propertyAsLong(latestSnapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1);
- // if the latest snapshot summary doesn't contain SnapshotSummary.ADDED_FILES_PROP,
- // iterate through addedDataFiles to compute addedFilesCount
- addedFilesCount =
- addedFilesCount == -1
- ? Iterables.size(latestSnapshot.addedDataFiles(table.io()))
- : addedFilesCount;
-
- return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount, false);
+
+ return new StreamingOffset(latestSnapshot.snapshotId(), addedFilesCount(latestSnapshot), false);
}
@Override
@@ -204,12 +205,20 @@ public class SparkMicroBatchStream implements MicroBatchStream {
StreamingOffset currentOffset = null;
+ // [(startOffset : startFileIndex), (endOffset : endFileIndex) )
do {
+ long endFileIndex;
if (currentOffset == null) {
currentOffset = batchStartOffset;
} else {
Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, currentOffset.snapshotId());
- currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);
+ // it may happen that we need to read this snapshot partially in case it's equal to
+ // endOffset.
+ if (currentOffset.snapshotId() != endOffset.snapshotId()) {
+ currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);
+ } else {
+ currentOffset = endOffset;
+ }
}
Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
@@ -226,12 +235,22 @@ public class SparkMicroBatchStream implements MicroBatchStream {
continue;
}
+ Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());
+ if (currentOffset.snapshotId() == endOffset.snapshotId()) {
+ endFileIndex = endOffset.position();
+ } else {
+ endFileIndex = addedFilesCount(currentSnapshot);
+ }
+
MicroBatch latestMicroBatch =
- MicroBatches.from(table.snapshot(currentOffset.snapshotId()), table.io())
+ MicroBatches.from(currentSnapshot, table.io())
.caseSensitive(caseSensitive)
.specsById(table.specs())
.generate(
- currentOffset.position(), Long.MAX_VALUE, currentOffset.shouldScanAllFiles());
+ currentOffset.position(),
+ endFileIndex,
+ Long.MAX_VALUE,
+ currentOffset.shouldScanAllFiles());
fileScanTasks.addAll(latestMicroBatch.tasks());
} while (currentOffset.snapshotId() != endOffset.snapshotId());
@@ -295,6 +314,128 @@ public class SparkMicroBatchStream implements MicroBatchStream {
}
}
+ @Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public Offset latestOffset(Offset startOffset, ReadLimit limit) {
+ // calculate end offset get snapshotId from the startOffset
+ Preconditions.checkArgument(
+ startOffset instanceof StreamingOffset,
+ "Invalid start offset: %s is not a StreamingOffset",
+ startOffset);
+
+ table.refresh();
+ if (table.currentSnapshot() == null) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ // end offset can expand to multiple snapshots
+ StreamingOffset startingOffset = (StreamingOffset) startOffset;
+
+ if (startOffset.equals(StreamingOffset.START_OFFSET)) {
+ startingOffset = determineStartingOffset(table, fromTimestamp);
+ }
+
+ Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
+ int startPosOfSnapOffset = (int) startingOffset.position();
+
+ boolean scanAllFiles = startingOffset.shouldScanAllFiles();
+
+ boolean shouldContinueReading = true;
+ int curFilesAdded = 0;
+ int curRecordCount = 0;
+ int curPos = 0;
+
+ // Note : we produce nextOffset with pos as non-inclusive
+ while (shouldContinueReading) {
+ // generate manifest index for the curSnapshot
+ List<Pair<ManifestFile, Integer>> indexedManifests =
+ MicroBatches.skippedManifestIndexesFromSnapshot(
+ table.io(), curSnapshot, startPosOfSnapOffset, scanAllFiles);
+ // this is under assumption we will be able to add at-least 1 file in the new offset
+ for (int idx = 0; idx < indexedManifests.size() && shouldContinueReading; idx++) {
+ // be rest assured curPos >= startFileIndex
+ curPos = indexedManifests.get(idx).second();
+ try (CloseableIterable<FileScanTask> taskIterable =
+ MicroBatches.openManifestFile(
+ table.io(),
+ table.specs(),
+ caseSensitive,
+ curSnapshot,
+ indexedManifests.get(idx).first(),
+ scanAllFiles);
+ CloseableIterator<FileScanTask> taskIter = taskIterable.iterator()) {
+ while (taskIter.hasNext()) {
+ FileScanTask task = taskIter.next();
+ if (curPos >= startPosOfSnapOffset) {
+ // TODO : use readLimit provided in function param, the readLimits are derived from
+ // these 2 properties.
+ if ((curFilesAdded + 1) > maxFilesPerMicroBatch
+ || (curRecordCount + task.file().recordCount()) > maxRecordsPerMicroBatch) {
+ shouldContinueReading = false;
+ break;
+ }
+
+ curFilesAdded += 1;
+ curRecordCount += task.file().recordCount();
+ }
+ ++curPos;
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Failed to close task iterable", ioe);
+ }
+ }
+ // if the currentSnapShot was also the mostRecentSnapshot then break
+ if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+ break;
+ }
+
+ // if everything was OK and we consumed complete snapshot then move to next snapshot
+ if (shouldContinueReading) {
+ startPosOfSnapOffset = -1;
+ curSnapshot = SnapshotUtil.snapshotAfter(table, curSnapshot.snapshotId());
+ // if anyhow we are moving to next snapshot we should only scan addedFiles
+ scanAllFiles = false;
+ }
+ }
+
+ StreamingOffset latestStreamingOffset =
+ new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles);
+
+ // if no new data arrived, then return null.
+ return latestStreamingOffset.equals(startingOffset) ? null : latestStreamingOffset;
+ }
+
+ private long addedFilesCount(Snapshot snapshot) {
+ long addedFilesCount =
+ PropertyUtil.propertyAsLong(snapshot.summary(), SnapshotSummary.ADDED_FILES_PROP, -1);
+ // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP,
+ // iterate through addedFiles iterator to find addedFilesCount.
+ return addedFilesCount == -1
+ ? Iterables.size(snapshot.addedDataFiles(table.io()))
+ : addedFilesCount;
+ }
+
+ @Override
+ public ReadLimit getDefaultReadLimit() {
+ if (maxFilesPerMicroBatch != Integer.MAX_VALUE
+ && maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
+ ReadLimit[] readLimits = new ReadLimit[2];
+ readLimits[0] = ReadLimit.maxFiles(maxFilesPerMicroBatch);
+ readLimits[1] = ReadLimit.maxRows(maxFilesPerMicroBatch);
+ return ReadLimit.compositeLimit(readLimits);
+ } else if (maxFilesPerMicroBatch != Integer.MAX_VALUE) {
+ return ReadLimit.maxFiles(maxFilesPerMicroBatch);
+ } else if (maxRecordsPerMicroBatch != Integer.MAX_VALUE) {
+ return ReadLimit.maxRows(maxRecordsPerMicroBatch);
+ } else {
+ return ReadLimit.allAvailable();
+ }
+ }
+
private static class InitialOffsetStore {
private final Table table;
private final FileIO io;
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index dd456f2237..a2d0c9acaf 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
@@ -47,6 +48,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
@@ -70,6 +72,8 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
private Table table;
+ private final AtomicInteger microBatches = new AtomicInteger();
+
/**
* test data to be used by multiple writes each write creates a snapshot and writes a list of
* records
@@ -115,6 +119,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
+ "PARTITIONED BY (bucket(3, id))",
tableName);
this.table = validationCatalog.loadTable(tableIdent);
+ microBatches.set(0);
}
@After
@@ -140,6 +145,57 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_1()
+ throws Exception {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+ Assert.assertEquals(
+ 6,
+ microBatchCount(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")));
+ }
+
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfFiles_2()
+ throws Exception {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+ Assert.assertEquals(
+ 3,
+ microBatchCount(
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2")));
+ }
+
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_1()
+ throws Exception {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+ // only 1 micro-batch will be formed and we will read data partially
+ Assert.assertEquals(
+ 1,
+ microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1")));
+
+ StreamingQuery query = startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1");
+
+ // check answer correctness only 1 record read the micro-batch will be stuck
+ List<SimpleRecord> actual = rowsAvailable(query);
+ Assertions.assertThat(actual)
+ .containsExactlyInAnyOrderElementsOf(
+ Lists.newArrayList(TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0)));
+ }
+
+ @Test
+ public void testReadStreamOnIcebergTableWithMultipleSnapshots_WithNumberOfRows_4()
+ throws Exception {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+ Assert.assertEquals(
+ 2,
+ microBatchCount(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")));
+ }
+
@Test
public void testReadStreamOnIcebergThenAddData() throws Exception {
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
@@ -561,7 +617,25 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
}
private StreamingQuery startStream(String key, String value) throws TimeoutException {
- return startStream(ImmutableMap.of(key, value));
+ return startStream(
+ ImmutableMap.of(key, value, SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"));
+ }
+
+ private int microBatchCount(Map<String, String> options) throws TimeoutException {
+ Dataset<Row> ds = spark.readStream().options(options).format("iceberg").load(tableName);
+
+ ds.writeStream()
+ .options(options)
+ .foreachBatch(
+ (VoidFunction2<Dataset<Row>, Long>)
+ (dataset, batchId) -> {
+ microBatches.getAndIncrement();
+ })
+ .start()
+ .processAllAvailable();
+
+ stopStreams();
+ return microBatches.get();
}
private List<SimpleRecord> rowsAvailable(StreamingQuery query) {