You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by fo...@apache.org on 2023/04/25 08:02:00 UTC
[iceberg] branch master updated: Spark: Backport #6480 to Spark 3.2 and Spark 3.1 (#7425)
This is an automated email from the ASF dual-hosted git repository.
fokko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f917e2800e Spark: Backport #6480 to Spark 3.2 and Spark 3.1 (#7425)
f917e2800e is described below
commit f917e2800e93b19cd96b2e8bdd5b59d04fc815f6
Author: Amogh Jahagirdar <ja...@amazon.com>
AuthorDate: Tue Apr 25 01:01:53 2023 -0700
Spark: Backport #6480 to Spark 3.2 and Spark 3.1 (#7425)
---
.../spark/source/SparkMicroBatchStream.java | 9 +++++
.../spark/source/TestStructuredStreamingRead3.java | 42 ++++++++++++++++++++++
.../spark/source/SparkMicroBatchStream.java | 9 +++++
.../spark/source/TestStructuredStreamingRead3.java | 42 ++++++++++++++++++++++
4 files changed, 102 insertions(+)
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 5346b5267c..1ed3b64f02 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -200,6 +200,15 @@ public class SparkMicroBatchStream implements MicroBatchStream {
currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);
}
+ Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
+
+ if (snapshot == null) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot load current offset at snapshot %d, the snapshot was expired or removed",
+ currentOffset.snapshotId()));
+ }
+
if (!shouldProcess(table.snapshot(currentOffset.snapshotId()))) {
LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name());
continue;
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 23fdfb09cb..dd456f2237 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -19,8 +19,10 @@
package org.apache.iceberg.spark.source;
import static org.apache.iceberg.expressions.Expressions.ref;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import java.io.File;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -323,6 +325,46 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
}
}
+ @Test
+ public void testFailReadingCheckpointInvalidSnapshot() throws IOException, TimeoutException {
+ File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder");
+ File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
+ File output = temp.newFolder();
+
+ DataStreamWriter querySource =
+ spark
+ .readStream()
+ .format("iceberg")
+ .load(tableName)
+ .writeStream()
+ .option("checkpointLocation", writerCheckpoint.toString())
+ .format("parquet")
+ .queryName("checkpoint_test")
+ .option("path", output.getPath());
+
+ List<SimpleRecord> firstSnapshotRecordList = Lists.newArrayList(new SimpleRecord(1, "one"));
+ List<SimpleRecord> secondSnapshotRecordList = Lists.newArrayList(new SimpleRecord(2, "two"));
+ StreamingQuery startQuery = querySource.start();
+
+ appendData(firstSnapshotRecordList);
+ table.refresh();
+ long firstSnapshotid = table.currentSnapshot().snapshotId();
+ startQuery.processAllAvailable();
+ startQuery.stop();
+
+ appendData(secondSnapshotRecordList);
+
+ table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit();
+
+ StreamingQuery restartedQuery = querySource.start();
+ assertThatThrownBy(restartedQuery::processAllAvailable)
+ .hasCauseInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ String.format(
+ "Cannot load current offset at snapshot %d, the snapshot was expired or removed",
+ firstSnapshotid));
+ }
+
@Test
public void testParquetOrcAvroDataInOneTable() throws Exception {
List<SimpleRecord> parquetFileRecords =
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 972988b6b2..816e3d2bf8 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -208,6 +208,15 @@ public class SparkMicroBatchStream implements MicroBatchStream {
currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, false);
}
+ Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
+
+ if (snapshot == null) {
+ throw new IllegalStateException(
+ String.format(
+ "Cannot load current offset at snapshot %d, the snapshot was expired or removed",
+ currentOffset.snapshotId()));
+ }
+
if (!shouldProcess(table.snapshot(currentOffset.snapshotId()))) {
LOG.debug("Skipping snapshot: {} of table {}", currentOffset.snapshotId(), table.name());
continue;
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 23fdfb09cb..dd456f2237 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -19,8 +19,10 @@
package org.apache.iceberg.spark.source;
import static org.apache.iceberg.expressions.Expressions.ref;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
import java.io.File;
+import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -323,6 +325,46 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
}
}
+ @Test
+ public void testFailReadingCheckpointInvalidSnapshot() throws IOException, TimeoutException {
+ File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder");
+ File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
+ File output = temp.newFolder();
+
+ DataStreamWriter querySource =
+ spark
+ .readStream()
+ .format("iceberg")
+ .load(tableName)
+ .writeStream()
+ .option("checkpointLocation", writerCheckpoint.toString())
+ .format("parquet")
+ .queryName("checkpoint_test")
+ .option("path", output.getPath());
+
+ List<SimpleRecord> firstSnapshotRecordList = Lists.newArrayList(new SimpleRecord(1, "one"));
+ List<SimpleRecord> secondSnapshotRecordList = Lists.newArrayList(new SimpleRecord(2, "two"));
+ StreamingQuery startQuery = querySource.start();
+
+ appendData(firstSnapshotRecordList);
+ table.refresh();
+ long firstSnapshotid = table.currentSnapshot().snapshotId();
+ startQuery.processAllAvailable();
+ startQuery.stop();
+
+ appendData(secondSnapshotRecordList);
+
+ table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit();
+
+ StreamingQuery restartedQuery = querySource.start();
+ assertThatThrownBy(restartedQuery::processAllAvailable)
+ .hasCauseInstanceOf(IllegalStateException.class)
+ .hasMessageContaining(
+ String.format(
+ "Cannot load current offset at snapshot %d, the snapshot was expired or removed",
+ firstSnapshotid));
+ }
+
@Test
public void testParquetOrcAvroDataInOneTable() throws Exception {
List<SimpleRecord> parquetFileRecords =