You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2023/04/25 08:02:00 UTC

[iceberg] branch master updated: Spark: Backport #6480 to Spark 3.2 and Spark 3.1 (#7425)

This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new f917e2800e Spark: Backport #6480 to Spark 3.2 and Spark 3.1 (#7425)
f917e2800e is described below

commit f917e2800e93b19cd96b2e8bdd5b59d04fc815f6
Author: Amogh Jahagirdar <ja...@amazon.com>
AuthorDate: Tue Apr 25 01:01:53 2023 -0700

    Spark: Backport #6480 to Spark 3.2 and Spark 3.1 (#7425)
---
 .../spark/source/SparkMicroBatchStream.java        |  9 +++++
 .../spark/source/TestStructuredStreamingRead3.java | 42 ++++++++++++++++++++++
 .../spark/source/SparkMicroBatchStream.java        |  9 +++++
 .../spark/source/TestStructuredStreamingRead3.java | 42 ++++++++++++++++++++++
 4 files changed, 102 insertions(+)

diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 5346b5267c..1ed3b64f02 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -200,6 +200,15 @@ public class SparkMicroBatchStream implements MicroBatchStream {
         currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);
       }
 
+      Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
+
+      if (snapshot == null) {
+        throw new IllegalStateException(
+            String.format(
+                "Cannot load current offset at snapshot %d, the snapshot was expired or removed",
+                currentOffset.snapshotId()));
+      }
+
       if (!shouldProcess(table.snapshot(currentOffset.snapshotId()))) {
         LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name());
         continue;
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 23fdfb09cb..dd456f2237 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -19,8 +19,10 @@
 package org.apache.iceberg.spark.source;
 
 import static org.apache.iceberg.expressions.Expressions.ref;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -323,6 +325,46 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     }
   }
 
+  @Test
+  public void testFailReadingCheckpointInvalidSnapshot() throws IOException, TimeoutException {
+    File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder");
+    File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
+    File output = temp.newFolder();
+
+    DataStreamWriter querySource =
+        spark
+            .readStream()
+            .format("iceberg")
+            .load(tableName)
+            .writeStream()
+            .option("checkpointLocation", writerCheckpoint.toString())
+            .format("parquet")
+            .queryName("checkpoint_test")
+            .option("path", output.getPath());
+
+    List<SimpleRecord> firstSnapshotRecordList = Lists.newArrayList(new SimpleRecord(1, "one"));
+    List<SimpleRecord> secondSnapshotRecordList = Lists.newArrayList(new SimpleRecord(2, "two"));
+    StreamingQuery startQuery = querySource.start();
+
+    appendData(firstSnapshotRecordList);
+    table.refresh();
+    long firstSnapshotid = table.currentSnapshot().snapshotId();
+    startQuery.processAllAvailable();
+    startQuery.stop();
+
+    appendData(secondSnapshotRecordList);
+
+    table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit();
+
+    StreamingQuery restartedQuery = querySource.start();
+    assertThatThrownBy(restartedQuery::processAllAvailable)
+        .hasCauseInstanceOf(IllegalStateException.class)
+        .hasMessageContaining(
+            String.format(
+                "Cannot load current offset at snapshot %d, the snapshot was expired or removed",
+                firstSnapshotid));
+  }
+
   @Test
   public void testParquetOrcAvroDataInOneTable() throws Exception {
     List<SimpleRecord> parquetFileRecords =
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 972988b6b2..816e3d2bf8 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -208,6 +208,15 @@ public class SparkMicroBatchStream implements MicroBatchStream {
         currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);
       }
 
+      Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
+
+      if (snapshot == null) {
+        throw new IllegalStateException(
+            String.format(
+                "Cannot load current offset at snapshot %d, the snapshot was expired or removed",
+                currentOffset.snapshotId()));
+      }
+
       if (!shouldProcess(table.snapshot(currentOffset.snapshotId()))) {
         LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name());
         continue;
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 23fdfb09cb..dd456f2237 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -19,8 +19,10 @@
 package org.apache.iceberg.spark.source;
 
 import static org.apache.iceberg.expressions.Expressions.ref;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -323,6 +325,46 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     }
   }
 
+  @Test
+  public void testFailReadingCheckpointInvalidSnapshot() throws IOException, TimeoutException {
+    File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder");
+    File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
+    File output = temp.newFolder();
+
+    DataStreamWriter querySource =
+        spark
+            .readStream()
+            .format("iceberg")
+            .load(tableName)
+            .writeStream()
+            .option("checkpointLocation", writerCheckpoint.toString())
+            .format("parquet")
+            .queryName("checkpoint_test")
+            .option("path", output.getPath());
+
+    List<SimpleRecord> firstSnapshotRecordList = Lists.newArrayList(new SimpleRecord(1, "one"));
+    List<SimpleRecord> secondSnapshotRecordList = Lists.newArrayList(new SimpleRecord(2, "two"));
+    StreamingQuery startQuery = querySource.start();
+
+    appendData(firstSnapshotRecordList);
+    table.refresh();
+    long firstSnapshotid = table.currentSnapshot().snapshotId();
+    startQuery.processAllAvailable();
+    startQuery.stop();
+
+    appendData(secondSnapshotRecordList);
+
+    table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit();
+
+    StreamingQuery restartedQuery = querySource.start();
+    assertThatThrownBy(restartedQuery::processAllAvailable)
+        .hasCauseInstanceOf(IllegalStateException.class)
+        .hasMessageContaining(
+            String.format(
+                "Cannot load current offset at snapshot %d, the snapshot was expired or removed",
+                firstSnapshotid));
+  }
+
   @Test
   public void testParquetOrcAvroDataInOneTable() throws Exception {
     List<SimpleRecord> parquetFileRecords =