You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/23 14:33:08 UTC
[iceberg] branch master updated: Spark: Fix failing unit test (#7414)
This is an automated email from the ASF dual-hosted git repository.
jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bdcc55b731 Spark: Fix failing unit test (#7414)
bdcc55b731 is described below
commit bdcc55b7318f0f026d31024366910e4d1671cf7e
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Sun Apr 23 07:32:59 2023 -0700
Spark: Fix failing unit test (#7414)
Co-authored-by: Prashant Singh <ps...@amazon.com>
---
.../iceberg/spark/source/SparkMicroBatchStream.java | 18 ++++++++++++------
1 file changed, 12 insertions(+), 6 deletions(-)
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index bb419e9951..e1a8ef5912 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -223,12 +223,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio
Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
- if (snapshot == null) {
- throw new IllegalStateException(
- String.format(
- "Cannot load current offset at snapshot %d, the snapshot was expired or removed",
- currentOffset.snapshotId()));
- }
+ validateCurrentSnapshotExists(snapshot, currentOffset);
if (!shouldProcess(snapshot)) {
LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name());
@@ -340,6 +335,8 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio
}
Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
+ validateCurrentSnapshotExists(curSnapshot, startingOffset);
+
int startPosOfSnapOffset = (int) startingOffset.position();
boolean scanAllFiles = startingOffset.shouldScanAllFiles();
@@ -419,6 +416,15 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio
: addedFilesCount;
}
+ private void validateCurrentSnapshotExists(Snapshot snapshot, StreamingOffset currentOffset) {
+ if (snapshot == null) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot load current offset at snapshot %d, the snapshot was expired or removed",
+ currentOffset.snapshotId()));
+ }
+ }
+
@Override
public ReadLimit getDefaultReadLimit() {
if (maxFilesPerMicroBatch != Integer.MAX_VALUE