You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/01/21 20:00:28 UTC

[iceberg] branch master updated: Spark: Backport Streaming Test Refactors (#3948)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d6b6b4  Spark: Backport Streaming Test Refactors (#3948)
4d6b6b4 is described below

commit 4d6b6b41afc959b4737306ebb07f30d1bdc54767
Author: Russell Spitzer <ru...@GMAIL.COM>
AuthorDate: Fri Jan 21 14:00:19 2022 -0600

    Spark: Backport Streaming Test Refactors (#3948)
    
    Back-porting test refactor from #3775
---
 .../spark/source/TestStructuredStreamingRead3.java | 367 +++++++++------------
 .../spark/source/TestStructuredStreamingRead3.java | 367 +++++++++------------
 2 files changed, 324 insertions(+), 410 deletions(-)

diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index d5bb1a1..145cf78 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -24,14 +24,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.conf.Configuration;
+import java.util.stream.IntStream;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataOperations;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
@@ -40,19 +39,17 @@ import org.apache.iceberg.data.FileHelpers;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkCatalogTestBase;
 import org.apache.iceberg.spark.SparkReadOptions;
-import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.streaming.DataStreamWriter;
 import org.apache.spark.sql.streaming.OutputMode;
 import org.apache.spark.sql.streaming.StreamingQuery;
-import org.apache.spark.sql.streaming.StreamingQueryException;
-import org.apache.spark.sql.streaming.Trigger;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
@@ -62,7 +59,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.expressions.Expressions.ref;
-import static org.apache.iceberg.types.Types.NestedField.optional;
 
 @RunWith(Parameterized.class)
 public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
@@ -72,13 +68,6 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
   }
 
   private Table table;
-  private String tableIdentifier;
-
-  private static final Configuration CONF = new Configuration();
-  private static final Schema SCHEMA = new Schema(
-      optional(1, "id", Types.IntegerType.get()),
-      optional(2, "data", Types.StringType.get())
-  );
 
   /**
    * test data to be used by multiple writes
@@ -135,7 +124,6 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
         "USING iceberg " +
         "PARTITIONED BY (bucket(3, id))", tableName);
     this.table = validationCatalog.loadTable(tableIdent);
-    this.tableIdentifier = tableName;
   }
 
   @After
@@ -150,19 +138,26 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception {
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(expected, tableIdentifier);
+    appendDataAsMultipleSnapshots(expected);
 
-    table.refresh();
+    StreamingQuery query = startStream();
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .load(tableIdentifier);
-    List<SimpleRecord> actual = processAvailable(df);
+    List<SimpleRecord> actual = rowsAvailable(query);
+    Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+  }
+
+  @Test
+  public void testReadStreamOnIcebergThenAddData() throws Exception {
+    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+
+    StreamingQuery query = startStream();
 
+    appendDataAsMultipleSnapshots(expected);
+
+    List<SimpleRecord> actual = rowsAvailable(query);
     Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
@@ -172,76 +167,93 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
         new SimpleRecord(-2, "minustwo"),
         new SimpleRecord(-1, "minusone"),
         new SimpleRecord(0, "zero"));
-    appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
+
+    appendData(dataBeforeTimestamp);
 
     table.refresh();
     long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
-    waitUntilAfter(streamStartTimestamp);
 
-    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(expected, tableIdentifier);
+    StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp));
 
-    table.refresh();
+    List<SimpleRecord> empty = rowsAvailable(query);
+    Assertions.assertThat(empty.isEmpty()).isTrue();
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
-        .load(tableIdentifier);
-    List<SimpleRecord> actual = processAvailable(df);
+    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expected);
+
+    List<SimpleRecord> actual = rowsAvailable(query);
 
     Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
   @Test
-  public void testReadingStreamAfterLatestTimestamp() throws Exception {
-    List<SimpleRecord> dataBeforeTimestamp = Lists.newArrayList(
+  public void testReadingStreamFromFutureTimetsamp() throws Exception {
+    long futureTimestamp = System.currentTimeMillis() + 10000;
+
+    StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp));
+
+    List<SimpleRecord> actual = rowsAvailable(query);
+    Assertions.assertThat(actual.isEmpty()).isTrue();
+
+    List<SimpleRecord> data = Lists.newArrayList(
         new SimpleRecord(-2, "minustwo"),
         new SimpleRecord(-1, "minusone"),
         new SimpleRecord(0, "zero"));
-    appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
 
-    table.refresh();
-    long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
-    waitUntilAfter(streamStartTimestamp);
+    // Perform several inserts that should not show up because the fromTimestamp has not elapsed
+    IntStream.range(0, 3).forEach(x -> {
+      appendData(data);
+      Assertions.assertThat(rowsAvailable(query).isEmpty()).isTrue();
+    });
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
-        .load(tableIdentifier);
-    List<SimpleRecord> actual = processAvailable(df);
-    Assertions.assertThat(actual.isEmpty()).isTrue();
+    waitUntilAfter(futureTimestamp);
+
+    // Data appended after the timestamp should appear
+    appendData(data);
+    actual = rowsAvailable(query);
+    Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(data);
   }
 
   @Test
-  public void testReadingStreamFromTimestampStartWithExistingTimestamp() throws Exception {
+  public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws Exception {
     List<SimpleRecord> dataBeforeTimestamp = Lists.newArrayList(
-        new SimpleRecord(-2, "minustwo"),
-        new SimpleRecord(-1, "minusone"),
-        new SimpleRecord(0, "zero"));
-    appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
+        new SimpleRecord(1, "one"),
+        new SimpleRecord(2, "two"),
+        new SimpleRecord(3, "three"));
+    appendData(dataBeforeTimestamp);
 
-    table.refresh();
+    long streamStartTimestamp = System.currentTimeMillis() + 2000;
+
+    // Start the stream with a future timestamp after the current snapshot
+    StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp));
+    List<SimpleRecord> actual = rowsAvailable(query);
+    Assert.assertEquals(Collections.emptyList(), actual);
+
+    // Stream should contain data added after the timestamp elapses
+    waitUntilAfter(streamStartTimestamp);
+    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expected);
+    Assertions.assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+  }
+
+  @Test
+  public void testReadingStreamFromTimestampOfExistingSnapshot() throws Exception {
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
 
-    // Append the first expected data
-    appendData(expected.get(0), tableIdentifier, "parquet");
+    // Create an existing snapshot with some data
+    appendData(expected.get(0));
     table.refresh();
-    long streamStartTimestamp = table.currentSnapshot().timestampMillis();
+    long firstSnapshotTime = table.currentSnapshot().timestampMillis();
 
-    // Start stream
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
-        .load(tableIdentifier);
+    // Start stream giving the first Snapshot's time as the start point
+    StreamingQuery stream = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(firstSnapshotTime));
 
     // Append rest of expected data
     for (int i = 1; i < expected.size(); i++) {
-      appendData(expected.get(i), tableIdentifier, "parquet");
+      appendData(expected.get(i));
     }
 
-    table.refresh();
-    List<SimpleRecord> actual = processAvailable(df);
-
+    List<SimpleRecord> actual = rowsAvailable(stream);
     Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
@@ -256,97 +268,65 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     List<SimpleRecord> thirdSnapshotRecordList = Lists.newArrayList(
         new SimpleRecord(3, "three"));
 
-    List<SimpleRecord> expectedRecordList = Lists.newArrayList(
-        new SimpleRecord(2, "two"),
-        new SimpleRecord(3, "three"));
+    List<SimpleRecord> expectedRecordList = Lists.newArrayList();
+    expectedRecordList.addAll(secondSnapshotRecordList);
+    expectedRecordList.addAll(thirdSnapshotRecordList);
 
-    appendData(firstSnapshotRecordList, tableIdentifier, "parquet");
+    appendData(firstSnapshotRecordList);
     table.refresh();
-    Snapshot firstSnapshot = table.currentSnapshot();
-
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(firstSnapshot.timestampMillis()))
-        .load(tableIdentifier);
+    long firstSnapshotid = table.currentSnapshot().snapshotId();
+    long firstSnapshotCommitTime = table.currentSnapshot().timestampMillis();
 
-    appendData(secondSnapshotRecordList, tableIdentifier, "parquet");
-    table.refresh();
 
-    appendData(thirdSnapshotRecordList, tableIdentifier, "parquet");
-    table.refresh();
+    appendData(secondSnapshotRecordList);
+    appendData(thirdSnapshotRecordList);
 
-    table.expireSnapshots().expireSnapshotId(firstSnapshot.snapshotId()).commit();
-    table.refresh();
+    table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit();
 
-    List<SimpleRecord> actual = processAvailable(df);
-    Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedRecordList));
+    StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, String.valueOf(firstSnapshotCommitTime));
+    List<SimpleRecord> actual = rowsAvailable(query);
+    Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRecordList);
   }
 
   @Test
-  public void testReadingStreamFromTimestampGreaterThanLatestSnapshotTime() throws Exception {
-    List<SimpleRecord> dataBeforeTimestamp = Lists.newArrayList(
-        new SimpleRecord(1, "one"),
-        new SimpleRecord(2, "two"),
-        new SimpleRecord(3, "three"));
-    appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
-
-    table.refresh();
-    long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
-    waitUntilAfter(streamStartTimestamp);
-
-    // Test stream with Timestamp > Latest Snapshot Time
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
-        .load(tableIdentifier);
-    List<SimpleRecord> actual = processAvailable(df);
-    Assert.assertEquals(Collections.emptyList(), actual);
-
-    // Test stream after new data is added
-    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(expected, tableIdentifier);
-    table.refresh();
-    Assertions.assertThat(processAvailable(df, "newdata"))
-        .containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
   public void testResumingStreamReadFromCheckpoint() throws Exception {
     File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder");
     File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
-    final String tempView = "microBatchView";
+    File output = temp.newFolder();
 
-    Dataset<Row> df = spark.readStream()
+    DataStreamWriter querySource = spark.readStream()
         .format("iceberg")
-        .load(tableIdentifier);
-
-    // Trigger.Once with the combination of StreamingQuery.awaitTermination, which succeeds after this code
-    // will result in stopping the stream.
-    // This is how Stream STOP and RESUME is simulated in this Test Case.
-    DataStreamWriter<Row> singleBatchWriter = df.writeStream()
-        .trigger(Trigger.Once())
+        .load(tableName)
+        .writeStream()
         .option("checkpointLocation", writerCheckpoint.toString())
-        .foreachBatch((batchDF, batchId) -> {
-          batchDF.createOrReplaceGlobalTempView(tempView);
-        });
-
-    String globalTempView = "global_temp." + tempView;
+        .format("parquet")
+        .queryName("checkpoint_test")
+        .option("path", output.getPath());
 
-    List<SimpleRecord> processStreamOnEmptyIcebergTable = processMicroBatch(singleBatchWriter, globalTempView);
-    Assert.assertEquals(Collections.emptyList(), processStreamOnEmptyIcebergTable);
+    StreamingQuery startQuery = querySource.start();
+    startQuery.processAllAvailable();
+    startQuery.stop();
 
+    List<SimpleRecord> expected = Lists.newArrayList();
     for (List<List<SimpleRecord>> expectedCheckpoint : TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
-      appendDataAsMultipleSnapshots(expectedCheckpoint, tableIdentifier);
-      table.refresh();
-
-      List<SimpleRecord> actualDataInCurrentMicroBatch = processMicroBatch(singleBatchWriter, globalTempView);
-      Assertions.assertThat(actualDataInCurrentMicroBatch)
-          .containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedCheckpoint));
+      // New data was added while the stream was down
+      appendDataAsMultipleSnapshots(expectedCheckpoint);
+      expected.addAll(Lists.newArrayList(Iterables.concat(Iterables.concat(expectedCheckpoint))));
+
+      // Stream starts up again from checkpoint read the newly added data and shut down
+      StreamingQuery restartedQuery = querySource.start();
+      restartedQuery.processAllAvailable();
+      restartedQuery.stop();
+
+      // Read data added by the stream
+      List<SimpleRecord> actual = spark.read()
+          .load(output.getPath())
+          .as(Encoders.bean(SimpleRecord.class))
+          .collectAsList();
+      Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
     }
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testParquetOrcAvroDataInOneTable() throws Exception {
     List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
@@ -362,33 +342,24 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
         new SimpleRecord(6, "six"),
         new SimpleRecord(7, "seven"));
 
-    appendData(parquetFileRecords, tableIdentifier, "parquet");
-    appendData(orcFileRecords, tableIdentifier, "orc");
-    appendData(avroFileRecords, tableIdentifier, "avro");
+    appendData(parquetFileRecords);
+    appendData(orcFileRecords, "orc");
+    appendData(avroFileRecords, "avro");
 
     table.refresh();
 
-    Dataset<Row> ds = spark.readStream()
-        .format("iceberg")
-        .load(tableIdentifier);
-    Assertions.assertThat(processAvailable(ds))
+    StreamingQuery query = startStream();
+    Assertions.assertThat(rowsAvailable(query))
         .containsExactlyInAnyOrderElementsOf(Iterables.concat(parquetFileRecords, orcFileRecords, avroFileRecords));
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testReadStreamFromEmptyTable() throws Exception {
-    table.refresh();
-
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .load(tableIdentifier);
-
-    List<SimpleRecord> actual = processAvailable(df);
+    StreamingQuery stream = startStream();
+    List<SimpleRecord> actual = rowsAvailable(stream);
     Assert.assertEquals(Collections.emptyList(), actual);
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception {
     // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE.
@@ -398,7 +369,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
 
     // fill table with some initial data
     List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);
+    appendDataAsMultipleSnapshots(dataAcrossSnapshots);
 
     Schema deleteRowSchema = table.schema().select("data");
     Record dataDelete = GenericRecord.create(deleteRowSchema);
@@ -416,29 +387,21 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     // check pre-condition - that the above Delete file write - actually resulted in snapshot of type OVERWRITE
     Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation());
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .load(tableIdentifier);
-    StreamingQuery streamingQuery = df.writeStream()
-        .format("memory")
-        .queryName("testtablewithoverwrites")
-        .outputMode(OutputMode.Append())
-        .start();
+    StreamingQuery query = startStream();
 
     AssertHelpers.assertThrowsCause(
         "Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND",
         IllegalStateException.class,
         "Cannot process overwrite snapshot",
-        () -> streamingQuery.processAllAvailable()
+        () -> query.processAllAvailable()
     );
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception {
     // fill table with some data
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(expected, tableIdentifier);
+    appendDataAsMultipleSnapshots(expected);
 
     table.refresh();
 
@@ -450,15 +413,11 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     // check pre-condition
     Assert.assertEquals(DataOperations.REPLACE, table.currentSnapshot().operation());
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .load(tableIdentifier);
-
-    List<SimpleRecord> actual = processAvailable(df);
+    StreamingQuery query = startStream();
+    List<SimpleRecord> actual = rowsAvailable(query);
     Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception {
     table.updateSpec()
@@ -470,7 +429,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
 
     // fill table with some data
     List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);
+    appendDataAsMultipleSnapshots(dataAcrossSnapshots);
 
     table.refresh();
 
@@ -483,24 +442,16 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     table.refresh();
     Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation());
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .load(tableIdentifier);
-    StreamingQuery streamingQuery = df.writeStream()
-        .format("memory")
-        .queryName("testtablewithdelete")
-        .outputMode(OutputMode.Append())
-        .start();
+    StreamingQuery query = startStream();
 
     AssertHelpers.assertThrowsCause(
         "Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND",
         IllegalStateException.class,
         "Cannot process delete snapshot",
-        () -> streamingQuery.processAllAvailable()
+        () -> query.processAllAvailable()
     );
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exception {
     table.updateSpec()
@@ -512,7 +463,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
 
     // fill table with some data
     List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);
+    appendDataAsMultipleSnapshots(dataAcrossSnapshots);
 
     table.refresh();
 
@@ -525,56 +476,62 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     table.refresh();
     Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation());
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .option(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true")
-        .load(tableIdentifier);
-    Assertions.assertThat(processAvailable(df))
+    StreamingQuery query = startStream(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true");
+    Assertions.assertThat(rowsAvailable(query))
         .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
   }
 
-  private static List<SimpleRecord> processMicroBatch(DataStreamWriter<Row> singleBatchWriter, String viewName)
-      throws TimeoutException, StreamingQueryException {
-    StreamingQuery streamingQuery = singleBatchWriter.start();
-    streamingQuery.awaitTermination();
-
-    return spark.sql(String.format("select * from %s", viewName))
-        .as(Encoders.bean(SimpleRecord.class))
-        .collectAsList();
-  }
-
   /**
    * appends each list as a Snapshot on the iceberg table at the given location.
    * accepts a list of lists - each list representing data per snapshot.
    */
-  private static void appendDataAsMultipleSnapshots(List<List<SimpleRecord>> data, String tableIdentifier) {
+  private void appendDataAsMultipleSnapshots(List<List<SimpleRecord>> data) {
     for (List<SimpleRecord> l : data) {
-      appendData(l, tableIdentifier, "parquet");
+      appendData(l);
     }
   }
 
-  private static void appendData(List<SimpleRecord> data, String tableIdentifier, String fileFormat) {
+  private void appendData(List<SimpleRecord> data) {
+    appendData(data, "parquet");
+  }
+
+  private void appendData(List<SimpleRecord> data, String format) {
     Dataset<Row> df = spark.createDataFrame(data, SimpleRecord.class);
     df.select("id", "data").write()
         .format("iceberg")
-        .option("write-format", fileFormat)
+        .option("write-format", format)
         .mode("append")
-        .save(tableIdentifier);
+        .save(tableName);
   }
 
-  private static List<SimpleRecord> processAvailable(Dataset<Row> df, String tableName) throws TimeoutException {
-    StreamingQuery streamingQuery = df.writeStream()
+  private static final String MEMORY_TABLE = "_stream_view_mem";
+
+  private StreamingQuery startStream(Map<String, String> options) throws TimeoutException {
+    return spark.readStream()
+        .options(options)
+        .format("iceberg")
+        .load(tableName)
+        .writeStream()
+        .options(options)
         .format("memory")
-        .queryName(tableName)
+        .queryName(MEMORY_TABLE)
         .outputMode(OutputMode.Append())
         .start();
-    streamingQuery.processAllAvailable();
-    return spark.sql("select * from " + tableName)
+  }
+
+  private StreamingQuery startStream() throws TimeoutException {
+    return startStream(Collections.emptyMap());
+  }
+
+  private StreamingQuery startStream(String key, String value) throws TimeoutException {
+    return startStream(ImmutableMap.of(key, value));
+  }
+
+  private List<SimpleRecord> rowsAvailable(StreamingQuery query) {
+    query.processAllAvailable();
+    return spark.sql("select * from " + MEMORY_TABLE)
         .as(Encoders.bean(SimpleRecord.class))
         .collectAsList();
   }
 
-  private static List<SimpleRecord> processAvailable(Dataset<Row> df) throws TimeoutException {
-    return processAvailable(df, "test12");
-  }
 }
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index d5bb1a1..145cf78 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -24,14 +24,13 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.conf.Configuration;
+import java.util.stream.IntStream;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataOperations;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.Schema;
-import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableOperations;
@@ -40,19 +39,17 @@ import org.apache.iceberg.data.FileHelpers;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkCatalogTestBase;
 import org.apache.iceberg.spark.SparkReadOptions;
-import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.streaming.DataStreamWriter;
 import org.apache.spark.sql.streaming.OutputMode;
 import org.apache.spark.sql.streaming.StreamingQuery;
-import org.apache.spark.sql.streaming.StreamingQueryException;
-import org.apache.spark.sql.streaming.Trigger;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
@@ -62,7 +59,6 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import static org.apache.iceberg.expressions.Expressions.ref;
-import static org.apache.iceberg.types.Types.NestedField.optional;
 
 @RunWith(Parameterized.class)
 public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
@@ -72,13 +68,6 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
   }
 
   private Table table;
-  private String tableIdentifier;
-
-  private static final Configuration CONF = new Configuration();
-  private static final Schema SCHEMA = new Schema(
-      optional(1, "id", Types.IntegerType.get()),
-      optional(2, "data", Types.StringType.get())
-  );
 
   /**
    * test data to be used by multiple writes
@@ -135,7 +124,6 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
         "USING iceberg " +
         "PARTITIONED BY (bucket(3, id))", tableName);
     this.table = validationCatalog.loadTable(tableIdent);
-    this.tableIdentifier = tableName;
   }
 
   @After
@@ -150,19 +138,26 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception {
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(expected, tableIdentifier);
+    appendDataAsMultipleSnapshots(expected);
 
-    table.refresh();
+    StreamingQuery query = startStream();
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .load(tableIdentifier);
-    List<SimpleRecord> actual = processAvailable(df);
+    List<SimpleRecord> actual = rowsAvailable(query);
+    Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+  }
+
+  @Test
+  public void testReadStreamOnIcebergThenAddData() throws Exception {
+    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+
+    StreamingQuery query = startStream();
 
+    appendDataAsMultipleSnapshots(expected);
+
+    List<SimpleRecord> actual = rowsAvailable(query);
     Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
@@ -172,76 +167,93 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
         new SimpleRecord(-2, "minustwo"),
         new SimpleRecord(-1, "minusone"),
         new SimpleRecord(0, "zero"));
-    appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
+
+    appendData(dataBeforeTimestamp);
 
     table.refresh();
     long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
-    waitUntilAfter(streamStartTimestamp);
 
-    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(expected, tableIdentifier);
+    StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp));
 
-    table.refresh();
+    List<SimpleRecord> empty = rowsAvailable(query);
+    Assertions.assertThat(empty.isEmpty()).isTrue();
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
-        .load(tableIdentifier);
-    List<SimpleRecord> actual = processAvailable(df);
+    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expected);
+
+    List<SimpleRecord> actual = rowsAvailable(query);
 
     Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
   @Test
-  public void testReadingStreamAfterLatestTimestamp() throws Exception {
-    List<SimpleRecord> dataBeforeTimestamp = Lists.newArrayList(
+  public void testReadingStreamFromFutureTimetsamp() throws Exception {
+    long futureTimestamp = System.currentTimeMillis() + 10000;
+
+    StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp));
+
+    List<SimpleRecord> actual = rowsAvailable(query);
+    Assertions.assertThat(actual.isEmpty()).isTrue();
+
+    List<SimpleRecord> data = Lists.newArrayList(
         new SimpleRecord(-2, "minustwo"),
         new SimpleRecord(-1, "minusone"),
         new SimpleRecord(0, "zero"));
-    appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
 
-    table.refresh();
-    long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
-    waitUntilAfter(streamStartTimestamp);
+    // Perform several inserts that should not show up because the fromTimestamp has not elapsed
+    IntStream.range(0, 3).forEach(x -> {
+      appendData(data);
+      Assertions.assertThat(rowsAvailable(query).isEmpty()).isTrue();
+    });
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
-        .load(tableIdentifier);
-    List<SimpleRecord> actual = processAvailable(df);
-    Assertions.assertThat(actual.isEmpty()).isTrue();
+    waitUntilAfter(futureTimestamp);
+
+    // Data appended after the timestamp should appear
+    appendData(data);
+    actual = rowsAvailable(query);
+    Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(data);
   }
 
   @Test
-  public void testReadingStreamFromTimestampStartWithExistingTimestamp() throws Exception {
+  public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws Exception {
     List<SimpleRecord> dataBeforeTimestamp = Lists.newArrayList(
-        new SimpleRecord(-2, "minustwo"),
-        new SimpleRecord(-1, "minusone"),
-        new SimpleRecord(0, "zero"));
-    appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
+        new SimpleRecord(1, "one"),
+        new SimpleRecord(2, "two"),
+        new SimpleRecord(3, "three"));
+    appendData(dataBeforeTimestamp);
 
-    table.refresh();
+    long streamStartTimestamp = System.currentTimeMillis() + 2000;
+
+    // Start the stream with a future timestamp after the current snapshot
+    StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp));
+    List<SimpleRecord> actual = rowsAvailable(query);
+    Assert.assertEquals(Collections.emptyList(), actual);
+
+    // Stream should contain data added after the timestamp elapses
+    waitUntilAfter(streamStartTimestamp);
+    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+    appendDataAsMultipleSnapshots(expected);
+    Assertions.assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+  }
+
+  @Test
+  public void testReadingStreamFromTimestampOfExistingSnapshot() throws Exception {
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
 
-    // Append the first expected data
-    appendData(expected.get(0), tableIdentifier, "parquet");
+    // Create an existing snapshot with some data
+    appendData(expected.get(0));
     table.refresh();
-    long streamStartTimestamp = table.currentSnapshot().timestampMillis();
+    long firstSnapshotTime = table.currentSnapshot().timestampMillis();
 
-    // Start stream
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
-        .load(tableIdentifier);
+    // Start stream giving the first Snapshot's time as the start point
+    StreamingQuery stream = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(firstSnapshotTime));
 
     // Append rest of expected data
     for (int i = 1; i < expected.size(); i++) {
-      appendData(expected.get(i), tableIdentifier, "parquet");
+      appendData(expected.get(i));
     }
 
-    table.refresh();
-    List<SimpleRecord> actual = processAvailable(df);
-
+    List<SimpleRecord> actual = rowsAvailable(stream);
     Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
@@ -256,97 +268,65 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     List<SimpleRecord> thirdSnapshotRecordList = Lists.newArrayList(
         new SimpleRecord(3, "three"));
 
-    List<SimpleRecord> expectedRecordList = Lists.newArrayList(
-        new SimpleRecord(2, "two"),
-        new SimpleRecord(3, "three"));
+    List<SimpleRecord> expectedRecordList = Lists.newArrayList();
+    expectedRecordList.addAll(secondSnapshotRecordList);
+    expectedRecordList.addAll(thirdSnapshotRecordList);
 
-    appendData(firstSnapshotRecordList, tableIdentifier, "parquet");
+    appendData(firstSnapshotRecordList);
     table.refresh();
-    Snapshot firstSnapshot = table.currentSnapshot();
-
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(firstSnapshot.timestampMillis()))
-        .load(tableIdentifier);
+    long firstSnapshotid = table.currentSnapshot().snapshotId();
+    long firstSnapshotCommitTime = table.currentSnapshot().timestampMillis();
 
-    appendData(secondSnapshotRecordList, tableIdentifier, "parquet");
-    table.refresh();
 
-    appendData(thirdSnapshotRecordList, tableIdentifier, "parquet");
-    table.refresh();
+    appendData(secondSnapshotRecordList);
+    appendData(thirdSnapshotRecordList);
 
-    table.expireSnapshots().expireSnapshotId(firstSnapshot.snapshotId()).commit();
-    table.refresh();
+    table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit();
 
-    List<SimpleRecord> actual = processAvailable(df);
-    Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedRecordList));
+    StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, String.valueOf(firstSnapshotCommitTime));
+    List<SimpleRecord> actual = rowsAvailable(query);
+    Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRecordList);
   }
 
   @Test
-  public void testReadingStreamFromTimestampGreaterThanLatestSnapshotTime() throws Exception {
-    List<SimpleRecord> dataBeforeTimestamp = Lists.newArrayList(
-        new SimpleRecord(1, "one"),
-        new SimpleRecord(2, "two"),
-        new SimpleRecord(3, "three"));
-    appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
-
-    table.refresh();
-    long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
-    waitUntilAfter(streamStartTimestamp);
-
-    // Test stream with Timestamp > Latest Snapshot Time
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
-        .load(tableIdentifier);
-    List<SimpleRecord> actual = processAvailable(df);
-    Assert.assertEquals(Collections.emptyList(), actual);
-
-    // Test stream after new data is added
-    List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(expected, tableIdentifier);
-    table.refresh();
-    Assertions.assertThat(processAvailable(df, "newdata"))
-        .containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
-  }
-
-  @SuppressWarnings("unchecked")
-  @Test
   public void testResumingStreamReadFromCheckpoint() throws Exception {
     File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder");
     File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
-    final String tempView = "microBatchView";
+    File output = temp.newFolder();
 
-    Dataset<Row> df = spark.readStream()
+    DataStreamWriter querySource = spark.readStream()
         .format("iceberg")
-        .load(tableIdentifier);
-
-    // Trigger.Once with the combination of StreamingQuery.awaitTermination, which succeeds after this code
-    // will result in stopping the stream.
-    // This is how Stream STOP and RESUME is simulated in this Test Case.
-    DataStreamWriter<Row> singleBatchWriter = df.writeStream()
-        .trigger(Trigger.Once())
+        .load(tableName)
+        .writeStream()
         .option("checkpointLocation", writerCheckpoint.toString())
-        .foreachBatch((batchDF, batchId) -> {
-          batchDF.createOrReplaceGlobalTempView(tempView);
-        });
-
-    String globalTempView = "global_temp." + tempView;
+        .format("parquet")
+        .queryName("checkpoint_test")
+        .option("path", output.getPath());
 
-    List<SimpleRecord> processStreamOnEmptyIcebergTable = processMicroBatch(singleBatchWriter, globalTempView);
-    Assert.assertEquals(Collections.emptyList(), processStreamOnEmptyIcebergTable);
+    StreamingQuery startQuery = querySource.start();
+    startQuery.processAllAvailable();
+    startQuery.stop();
 
+    List<SimpleRecord> expected = Lists.newArrayList();
     for (List<List<SimpleRecord>> expectedCheckpoint : TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
-      appendDataAsMultipleSnapshots(expectedCheckpoint, tableIdentifier);
-      table.refresh();
-
-      List<SimpleRecord> actualDataInCurrentMicroBatch = processMicroBatch(singleBatchWriter, globalTempView);
-      Assertions.assertThat(actualDataInCurrentMicroBatch)
-          .containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedCheckpoint));
+      // New data was added while the stream was down
+      appendDataAsMultipleSnapshots(expectedCheckpoint);
+      expected.addAll(Lists.newArrayList(Iterables.concat(Iterables.concat(expectedCheckpoint))));
+
+      // Stream starts up again from checkpoint read the newly added data and shut down
+      StreamingQuery restartedQuery = querySource.start();
+      restartedQuery.processAllAvailable();
+      restartedQuery.stop();
+
+      // Read data added by the stream
+      List<SimpleRecord> actual = spark.read()
+          .load(output.getPath())
+          .as(Encoders.bean(SimpleRecord.class))
+          .collectAsList();
+      Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
     }
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testParquetOrcAvroDataInOneTable() throws Exception {
     List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
@@ -362,33 +342,24 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
         new SimpleRecord(6, "six"),
         new SimpleRecord(7, "seven"));
 
-    appendData(parquetFileRecords, tableIdentifier, "parquet");
-    appendData(orcFileRecords, tableIdentifier, "orc");
-    appendData(avroFileRecords, tableIdentifier, "avro");
+    appendData(parquetFileRecords);
+    appendData(orcFileRecords, "orc");
+    appendData(avroFileRecords, "avro");
 
     table.refresh();
 
-    Dataset<Row> ds = spark.readStream()
-        .format("iceberg")
-        .load(tableIdentifier);
-    Assertions.assertThat(processAvailable(ds))
+    StreamingQuery query = startStream();
+    Assertions.assertThat(rowsAvailable(query))
         .containsExactlyInAnyOrderElementsOf(Iterables.concat(parquetFileRecords, orcFileRecords, avroFileRecords));
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testReadStreamFromEmptyTable() throws Exception {
-    table.refresh();
-
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .load(tableIdentifier);
-
-    List<SimpleRecord> actual = processAvailable(df);
+    StreamingQuery stream = startStream();
+    List<SimpleRecord> actual = rowsAvailable(stream);
     Assert.assertEquals(Collections.emptyList(), actual);
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception {
     // upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE.
@@ -398,7 +369,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
 
     // fill table with some initial data
     List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);
+    appendDataAsMultipleSnapshots(dataAcrossSnapshots);
 
     Schema deleteRowSchema = table.schema().select("data");
     Record dataDelete = GenericRecord.create(deleteRowSchema);
@@ -416,29 +387,21 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     // check pre-condition - that the above Delete file write - actually resulted in snapshot of type OVERWRITE
     Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation());
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .load(tableIdentifier);
-    StreamingQuery streamingQuery = df.writeStream()
-        .format("memory")
-        .queryName("testtablewithoverwrites")
-        .outputMode(OutputMode.Append())
-        .start();
+    StreamingQuery query = startStream();
 
     AssertHelpers.assertThrowsCause(
         "Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND",
         IllegalStateException.class,
         "Cannot process overwrite snapshot",
-        () -> streamingQuery.processAllAvailable()
+        () -> query.processAllAvailable()
     );
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception {
     // fill table with some data
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(expected, tableIdentifier);
+    appendDataAsMultipleSnapshots(expected);
 
     table.refresh();
 
@@ -450,15 +413,11 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     // check pre-condition
     Assert.assertEquals(DataOperations.REPLACE, table.currentSnapshot().operation());
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .load(tableIdentifier);
-
-    List<SimpleRecord> actual = processAvailable(df);
+    StreamingQuery query = startStream();
+    List<SimpleRecord> actual = rowsAvailable(query);
     Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception {
     table.updateSpec()
@@ -470,7 +429,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
 
     // fill table with some data
     List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);
+    appendDataAsMultipleSnapshots(dataAcrossSnapshots);
 
     table.refresh();
 
@@ -483,24 +442,16 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     table.refresh();
     Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation());
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .load(tableIdentifier);
-    StreamingQuery streamingQuery = df.writeStream()
-        .format("memory")
-        .queryName("testtablewithdelete")
-        .outputMode(OutputMode.Append())
-        .start();
+    StreamingQuery query = startStream();
 
     AssertHelpers.assertThrowsCause(
         "Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND",
         IllegalStateException.class,
         "Cannot process delete snapshot",
-        () -> streamingQuery.processAllAvailable()
+        () -> query.processAllAvailable()
     );
   }
 
-  @SuppressWarnings("unchecked")
   @Test
   public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exception {
     table.updateSpec()
@@ -512,7 +463,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
 
     // fill table with some data
     List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
-    appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);
+    appendDataAsMultipleSnapshots(dataAcrossSnapshots);
 
     table.refresh();
 
@@ -525,56 +476,62 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
     table.refresh();
     Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation());
 
-    Dataset<Row> df = spark.readStream()
-        .format("iceberg")
-        .option(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true")
-        .load(tableIdentifier);
-    Assertions.assertThat(processAvailable(df))
+    StreamingQuery query = startStream(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true");
+    Assertions.assertThat(rowsAvailable(query))
         .containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
   }
 
-  private static List<SimpleRecord> processMicroBatch(DataStreamWriter<Row> singleBatchWriter, String viewName)
-      throws TimeoutException, StreamingQueryException {
-    StreamingQuery streamingQuery = singleBatchWriter.start();
-    streamingQuery.awaitTermination();
-
-    return spark.sql(String.format("select * from %s", viewName))
-        .as(Encoders.bean(SimpleRecord.class))
-        .collectAsList();
-  }
-
   /**
    * appends each list as a Snapshot on the iceberg table at the given location.
    * accepts a list of lists - each list representing data per snapshot.
    */
-  private static void appendDataAsMultipleSnapshots(List<List<SimpleRecord>> data, String tableIdentifier) {
+  private void appendDataAsMultipleSnapshots(List<List<SimpleRecord>> data) {
     for (List<SimpleRecord> l : data) {
-      appendData(l, tableIdentifier, "parquet");
+      appendData(l);
     }
   }
 
-  private static void appendData(List<SimpleRecord> data, String tableIdentifier, String fileFormat) {
+  private void appendData(List<SimpleRecord> data) {
+    appendData(data, "parquet");
+  }
+
+  private void appendData(List<SimpleRecord> data, String format) {
     Dataset<Row> df = spark.createDataFrame(data, SimpleRecord.class);
     df.select("id", "data").write()
         .format("iceberg")
-        .option("write-format", fileFormat)
+        .option("write-format", format)
         .mode("append")
-        .save(tableIdentifier);
+        .save(tableName);
   }
 
-  private static List<SimpleRecord> processAvailable(Dataset<Row> df, String tableName) throws TimeoutException {
-    StreamingQuery streamingQuery = df.writeStream()
+  private static final String MEMORY_TABLE = "_stream_view_mem";
+
+  private StreamingQuery startStream(Map<String, String> options) throws TimeoutException {
+    return spark.readStream()
+        .options(options)
+        .format("iceberg")
+        .load(tableName)
+        .writeStream()
+        .options(options)
         .format("memory")
-        .queryName(tableName)
+        .queryName(MEMORY_TABLE)
         .outputMode(OutputMode.Append())
         .start();
-    streamingQuery.processAllAvailable();
-    return spark.sql("select * from " + tableName)
+  }
+
+  private StreamingQuery startStream() throws TimeoutException {
+    return startStream(Collections.emptyMap());
+  }
+
+  private StreamingQuery startStream(String key, String value) throws TimeoutException {
+    return startStream(ImmutableMap.of(key, value));
+  }
+
+  private List<SimpleRecord> rowsAvailable(StreamingQuery query) {
+    query.processAllAvailable();
+    return spark.sql("select * from " + MEMORY_TABLE)
         .as(Encoders.bean(SimpleRecord.class))
         .collectAsList();
   }
 
-  private static List<SimpleRecord> processAvailable(Dataset<Row> df) throws TimeoutException {
-    return processAvailable(df, "test12");
-  }
 }