You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ja...@apache.org on 2023/04/23 14:33:08 UTC

[iceberg] branch master updated: Spark: Fix failing unit test (#7414)

This is an automated email from the ASF dual-hosted git repository.

jackye pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bdcc55b731 Spark: Fix failing unit test (#7414)
bdcc55b731 is described below

commit bdcc55b7318f0f026d31024366910e4d1671cf7e
Author: Prashant Singh <35...@users.noreply.github.com>
AuthorDate: Sun Apr 23 07:32:59 2023 -0700

    Spark: Fix failing unit test (#7414)
    
    Co-authored-by: Prashant Singh <ps...@amazon.com>
---
 .../iceberg/spark/source/SparkMicroBatchStream.java    | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index bb419e9951..e1a8ef5912 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -223,12 +223,7 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio
 
       Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
 
-      if (snapshot == null) {
-        throw new IllegalStateException(
-            String.format(
-                "Cannot load current offset at snapshot %d, the snapshot was expired or removed",
-                currentOffset.snapshotId()));
-      }
+      validateCurrentSnapshotExists(snapshot, currentOffset);
 
       if (!shouldProcess(snapshot)) {
         LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name());
@@ -340,6 +335,8 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio
     }
 
     Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
+    validateCurrentSnapshotExists(curSnapshot, startingOffset);
+
     int startPosOfSnapOffset = (int) startingOffset.position();
 
     boolean scanAllFiles = startingOffset.shouldScanAllFiles();
@@ -419,6 +416,15 @@ public class SparkMicroBatchStream implements MicroBatchStream, SupportsAdmissio
         : addedFilesCount;
   }
 
+  private void validateCurrentSnapshotExists(Snapshot snapshot, StreamingOffset currentOffset) {
+    if (snapshot == null) {
+      throw new IllegalStateException(
+          String.format(
+              "Cannot load current offset at snapshot %d, the snapshot was expired or removed",
+              currentOffset.snapshotId()));
+    }
+  }
+
   @Override
   public ReadLimit getDefaultReadLimit() {
     if (maxFilesPerMicroBatch != Integer.MAX_VALUE