You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2022/01/21 20:00:28 UTC
[iceberg] branch master updated: Spark: Backport Streaming Test Refactors (#3948)
This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 4d6b6b4 Spark: Backport Streaming Test Refactors (#3948)
4d6b6b4 is described below
commit 4d6b6b41afc959b4737306ebb07f30d1bdc54767
Author: Russell Spitzer <ru...@GMAIL.COM>
AuthorDate: Fri Jan 21 14:00:19 2022 -0600
Spark: Backport Streaming Test Refactors (#3948)
Back-porting test refactor from #3775
---
.../spark/source/TestStructuredStreamingRead3.java | 367 +++++++++------------
.../spark/source/TestStructuredStreamingRead3.java | 367 +++++++++------------
2 files changed, 324 insertions(+), 410 deletions(-)
diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index d5bb1a1..145cf78 100644
--- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -24,14 +24,13 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.conf.Configuration;
+import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
@@ -40,19 +39,17 @@ import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkReadOptions;
-import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
-import org.apache.spark.sql.streaming.StreamingQueryException;
-import org.apache.spark.sql.streaming.Trigger;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
@@ -62,7 +59,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.expressions.Expressions.ref;
-import static org.apache.iceberg.types.Types.NestedField.optional;
@RunWith(Parameterized.class)
public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
@@ -72,13 +68,6 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
}
private Table table;
- private String tableIdentifier;
-
- private static final Configuration CONF = new Configuration();
- private static final Schema SCHEMA = new Schema(
- optional(1, "id", Types.IntegerType.get()),
- optional(2, "data", Types.StringType.get())
- );
/**
* test data to be used by multiple writes
@@ -135,7 +124,6 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
"USING iceberg " +
"PARTITIONED BY (bucket(3, id))", tableName);
this.table = validationCatalog.loadTable(tableIdent);
- this.tableIdentifier = tableName;
}
@After
@@ -150,19 +138,26 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
sql("DROP TABLE IF EXISTS %s", tableName);
}
- @SuppressWarnings("unchecked")
@Test
public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception {
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(expected, tableIdentifier);
+ appendDataAsMultipleSnapshots(expected);
- table.refresh();
+ StreamingQuery query = startStream();
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .load(tableIdentifier);
- List<SimpleRecord> actual = processAvailable(df);
+ List<SimpleRecord> actual = rowsAvailable(query);
+ Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+ }
+
+ @Test
+ public void testReadStreamOnIcebergThenAddData() throws Exception {
+ List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+
+ StreamingQuery query = startStream();
+ appendDataAsMultipleSnapshots(expected);
+
+ List<SimpleRecord> actual = rowsAvailable(query);
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
@@ -172,76 +167,93 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
new SimpleRecord(-2, "minustwo"),
new SimpleRecord(-1, "minusone"),
new SimpleRecord(0, "zero"));
- appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
+
+ appendData(dataBeforeTimestamp);
table.refresh();
long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
- waitUntilAfter(streamStartTimestamp);
- List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(expected, tableIdentifier);
+ StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp));
- table.refresh();
+ List<SimpleRecord> empty = rowsAvailable(query);
+ Assertions.assertThat(empty.isEmpty()).isTrue();
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
- .load(tableIdentifier);
- List<SimpleRecord> actual = processAvailable(df);
+ List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+ appendDataAsMultipleSnapshots(expected);
+
+ List<SimpleRecord> actual = rowsAvailable(query);
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
@Test
- public void testReadingStreamAfterLatestTimestamp() throws Exception {
- List<SimpleRecord> dataBeforeTimestamp = Lists.newArrayList(
+ public void testReadingStreamFromFutureTimetsamp() throws Exception {
+ long futureTimestamp = System.currentTimeMillis() + 10000;
+
+ StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp));
+
+ List<SimpleRecord> actual = rowsAvailable(query);
+ Assertions.assertThat(actual.isEmpty()).isTrue();
+
+ List<SimpleRecord> data = Lists.newArrayList(
new SimpleRecord(-2, "minustwo"),
new SimpleRecord(-1, "minusone"),
new SimpleRecord(0, "zero"));
- appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
- table.refresh();
- long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
- waitUntilAfter(streamStartTimestamp);
+ // Perform several inserts that should not show up because the fromTimestamp has not elapsed
+ IntStream.range(0, 3).forEach(x -> {
+ appendData(data);
+ Assertions.assertThat(rowsAvailable(query).isEmpty()).isTrue();
+ });
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
- .load(tableIdentifier);
- List<SimpleRecord> actual = processAvailable(df);
- Assertions.assertThat(actual.isEmpty()).isTrue();
+ waitUntilAfter(futureTimestamp);
+
+ // Data appended after the timestamp should appear
+ appendData(data);
+ actual = rowsAvailable(query);
+ Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(data);
}
@Test
- public void testReadingStreamFromTimestampStartWithExistingTimestamp() throws Exception {
+ public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws Exception {
List<SimpleRecord> dataBeforeTimestamp = Lists.newArrayList(
- new SimpleRecord(-2, "minustwo"),
- new SimpleRecord(-1, "minusone"),
- new SimpleRecord(0, "zero"));
- appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
+ new SimpleRecord(1, "one"),
+ new SimpleRecord(2, "two"),
+ new SimpleRecord(3, "three"));
+ appendData(dataBeforeTimestamp);
- table.refresh();
+ long streamStartTimestamp = System.currentTimeMillis() + 2000;
+
+ // Start the stream with a future timestamp after the current snapshot
+ StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp));
+ List<SimpleRecord> actual = rowsAvailable(query);
+ Assert.assertEquals(Collections.emptyList(), actual);
+
+ // Stream should contain data added after the timestamp elapses
+ waitUntilAfter(streamStartTimestamp);
+ List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+ appendDataAsMultipleSnapshots(expected);
+ Assertions.assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+ }
+
+ @Test
+ public void testReadingStreamFromTimestampOfExistingSnapshot() throws Exception {
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
- // Append the first expected data
- appendData(expected.get(0), tableIdentifier, "parquet");
+ // Create an existing snapshot with some data
+ appendData(expected.get(0));
table.refresh();
- long streamStartTimestamp = table.currentSnapshot().timestampMillis();
+ long firstSnapshotTime = table.currentSnapshot().timestampMillis();
- // Start stream
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
- .load(tableIdentifier);
+ // Start stream giving the first Snapshot's time as the start point
+ StreamingQuery stream = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(firstSnapshotTime));
// Append rest of expected data
for (int i = 1; i < expected.size(); i++) {
- appendData(expected.get(i), tableIdentifier, "parquet");
+ appendData(expected.get(i));
}
- table.refresh();
- List<SimpleRecord> actual = processAvailable(df);
-
+ List<SimpleRecord> actual = rowsAvailable(stream);
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
@@ -256,97 +268,65 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
List<SimpleRecord> thirdSnapshotRecordList = Lists.newArrayList(
new SimpleRecord(3, "three"));
- List<SimpleRecord> expectedRecordList = Lists.newArrayList(
- new SimpleRecord(2, "two"),
- new SimpleRecord(3, "three"));
+ List<SimpleRecord> expectedRecordList = Lists.newArrayList();
+ expectedRecordList.addAll(secondSnapshotRecordList);
+ expectedRecordList.addAll(thirdSnapshotRecordList);
- appendData(firstSnapshotRecordList, tableIdentifier, "parquet");
+ appendData(firstSnapshotRecordList);
table.refresh();
- Snapshot firstSnapshot = table.currentSnapshot();
-
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(firstSnapshot.timestampMillis()))
- .load(tableIdentifier);
+ long firstSnapshotid = table.currentSnapshot().snapshotId();
+ long firstSnapshotCommitTime = table.currentSnapshot().timestampMillis();
- appendData(secondSnapshotRecordList, tableIdentifier, "parquet");
- table.refresh();
- appendData(thirdSnapshotRecordList, tableIdentifier, "parquet");
- table.refresh();
+ appendData(secondSnapshotRecordList);
+ appendData(thirdSnapshotRecordList);
- table.expireSnapshots().expireSnapshotId(firstSnapshot.snapshotId()).commit();
- table.refresh();
+ table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit();
- List<SimpleRecord> actual = processAvailable(df);
- Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedRecordList));
+ StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, String.valueOf(firstSnapshotCommitTime));
+ List<SimpleRecord> actual = rowsAvailable(query);
+ Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRecordList);
}
@Test
- public void testReadingStreamFromTimestampGreaterThanLatestSnapshotTime() throws Exception {
- List<SimpleRecord> dataBeforeTimestamp = Lists.newArrayList(
- new SimpleRecord(1, "one"),
- new SimpleRecord(2, "two"),
- new SimpleRecord(3, "three"));
- appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
-
- table.refresh();
- long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
- waitUntilAfter(streamStartTimestamp);
-
- // Test stream with Timestamp > Latest Snapshot Time
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
- .load(tableIdentifier);
- List<SimpleRecord> actual = processAvailable(df);
- Assert.assertEquals(Collections.emptyList(), actual);
-
- // Test stream after new data is added
- List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(expected, tableIdentifier);
- table.refresh();
- Assertions.assertThat(processAvailable(df, "newdata"))
- .containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
- }
-
- @SuppressWarnings("unchecked")
- @Test
public void testResumingStreamReadFromCheckpoint() throws Exception {
File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder");
File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
- final String tempView = "microBatchView";
+ File output = temp.newFolder();
- Dataset<Row> df = spark.readStream()
+ DataStreamWriter querySource = spark.readStream()
.format("iceberg")
- .load(tableIdentifier);
-
- // Trigger.Once with the combination of StreamingQuery.awaitTermination, which succeeds after this code
- // will result in stopping the stream.
- // This is how Stream STOP and RESUME is simulated in this Test Case.
- DataStreamWriter<Row> singleBatchWriter = df.writeStream()
- .trigger(Trigger.Once())
+ .load(tableName)
+ .writeStream()
.option("checkpointLocation", writerCheckpoint.toString())
- .foreachBatch((batchDF, batchId) -> {
- batchDF.createOrReplaceGlobalTempView(tempView);
- });
-
- String globalTempView = "global_temp." + tempView;
+ .format("parquet")
+ .queryName("checkpoint_test")
+ .option("path", output.getPath());
- List<SimpleRecord> processStreamOnEmptyIcebergTable = processMicroBatch(singleBatchWriter, globalTempView);
- Assert.assertEquals(Collections.emptyList(), processStreamOnEmptyIcebergTable);
+ StreamingQuery startQuery = querySource.start();
+ startQuery.processAllAvailable();
+ startQuery.stop();
+ List<SimpleRecord> expected = Lists.newArrayList();
for (List<List<SimpleRecord>> expectedCheckpoint : TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
- appendDataAsMultipleSnapshots(expectedCheckpoint, tableIdentifier);
- table.refresh();
-
- List<SimpleRecord> actualDataInCurrentMicroBatch = processMicroBatch(singleBatchWriter, globalTempView);
- Assertions.assertThat(actualDataInCurrentMicroBatch)
- .containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedCheckpoint));
+ // New data was added while the stream was down
+ appendDataAsMultipleSnapshots(expectedCheckpoint);
+ expected.addAll(Lists.newArrayList(Iterables.concat(Iterables.concat(expectedCheckpoint))));
+
+ // Stream starts up again from checkpoint read the newly added data and shut down
+ StreamingQuery restartedQuery = querySource.start();
+ restartedQuery.processAllAvailable();
+ restartedQuery.stop();
+
+ // Read data added by the stream
+ List<SimpleRecord> actual = spark.read()
+ .load(output.getPath())
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
}
- @SuppressWarnings("unchecked")
@Test
public void testParquetOrcAvroDataInOneTable() throws Exception {
List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
@@ -362,33 +342,24 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
new SimpleRecord(6, "six"),
new SimpleRecord(7, "seven"));
- appendData(parquetFileRecords, tableIdentifier, "parquet");
- appendData(orcFileRecords, tableIdentifier, "orc");
- appendData(avroFileRecords, tableIdentifier, "avro");
+ appendData(parquetFileRecords);
+ appendData(orcFileRecords, "orc");
+ appendData(avroFileRecords, "avro");
table.refresh();
- Dataset<Row> ds = spark.readStream()
- .format("iceberg")
- .load(tableIdentifier);
- Assertions.assertThat(processAvailable(ds))
+ StreamingQuery query = startStream();
+ Assertions.assertThat(rowsAvailable(query))
.containsExactlyInAnyOrderElementsOf(Iterables.concat(parquetFileRecords, orcFileRecords, avroFileRecords));
}
- @SuppressWarnings("unchecked")
@Test
public void testReadStreamFromEmptyTable() throws Exception {
- table.refresh();
-
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .load(tableIdentifier);
-
- List<SimpleRecord> actual = processAvailable(df);
+ StreamingQuery stream = startStream();
+ List<SimpleRecord> actual = rowsAvailable(stream);
Assert.assertEquals(Collections.emptyList(), actual);
}
- @SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception {
// upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE.
@@ -398,7 +369,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
// fill table with some initial data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);
+ appendDataAsMultipleSnapshots(dataAcrossSnapshots);
Schema deleteRowSchema = table.schema().select("data");
Record dataDelete = GenericRecord.create(deleteRowSchema);
@@ -416,29 +387,21 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
// check pre-condition - that the above Delete file write - actually resulted in snapshot of type OVERWRITE
Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation());
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .load(tableIdentifier);
- StreamingQuery streamingQuery = df.writeStream()
- .format("memory")
- .queryName("testtablewithoverwrites")
- .outputMode(OutputMode.Append())
- .start();
+ StreamingQuery query = startStream();
AssertHelpers.assertThrowsCause(
"Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND",
IllegalStateException.class,
"Cannot process overwrite snapshot",
- () -> streamingQuery.processAllAvailable()
+ () -> query.processAllAvailable()
);
}
- @SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception {
// fill table with some data
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(expected, tableIdentifier);
+ appendDataAsMultipleSnapshots(expected);
table.refresh();
@@ -450,15 +413,11 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
// check pre-condition
Assert.assertEquals(DataOperations.REPLACE, table.currentSnapshot().operation());
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .load(tableIdentifier);
-
- List<SimpleRecord> actual = processAvailable(df);
+ StreamingQuery query = startStream();
+ List<SimpleRecord> actual = rowsAvailable(query);
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
- @SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception {
table.updateSpec()
@@ -470,7 +429,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
// fill table with some data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);
+ appendDataAsMultipleSnapshots(dataAcrossSnapshots);
table.refresh();
@@ -483,24 +442,16 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
table.refresh();
Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation());
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .load(tableIdentifier);
- StreamingQuery streamingQuery = df.writeStream()
- .format("memory")
- .queryName("testtablewithdelete")
- .outputMode(OutputMode.Append())
- .start();
+ StreamingQuery query = startStream();
AssertHelpers.assertThrowsCause(
"Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND",
IllegalStateException.class,
"Cannot process delete snapshot",
- () -> streamingQuery.processAllAvailable()
+ () -> query.processAllAvailable()
);
}
- @SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exception {
table.updateSpec()
@@ -512,7 +463,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
// fill table with some data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);
+ appendDataAsMultipleSnapshots(dataAcrossSnapshots);
table.refresh();
@@ -525,56 +476,62 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
table.refresh();
Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation());
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .option(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true")
- .load(tableIdentifier);
- Assertions.assertThat(processAvailable(df))
+ StreamingQuery query = startStream(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true");
+ Assertions.assertThat(rowsAvailable(query))
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}
- private static List<SimpleRecord> processMicroBatch(DataStreamWriter<Row> singleBatchWriter, String viewName)
- throws TimeoutException, StreamingQueryException {
- StreamingQuery streamingQuery = singleBatchWriter.start();
- streamingQuery.awaitTermination();
-
- return spark.sql(String.format("select * from %s", viewName))
- .as(Encoders.bean(SimpleRecord.class))
- .collectAsList();
- }
-
/**
* appends each list as a Snapshot on the iceberg table at the given location.
* accepts a list of lists - each list representing data per snapshot.
*/
- private static void appendDataAsMultipleSnapshots(List<List<SimpleRecord>> data, String tableIdentifier) {
+ private void appendDataAsMultipleSnapshots(List<List<SimpleRecord>> data) {
for (List<SimpleRecord> l : data) {
- appendData(l, tableIdentifier, "parquet");
+ appendData(l);
}
}
- private static void appendData(List<SimpleRecord> data, String tableIdentifier, String fileFormat) {
+ private void appendData(List<SimpleRecord> data) {
+ appendData(data, "parquet");
+ }
+
+ private void appendData(List<SimpleRecord> data, String format) {
Dataset<Row> df = spark.createDataFrame(data, SimpleRecord.class);
df.select("id", "data").write()
.format("iceberg")
- .option("write-format", fileFormat)
+ .option("write-format", format)
.mode("append")
- .save(tableIdentifier);
+ .save(tableName);
}
- private static List<SimpleRecord> processAvailable(Dataset<Row> df, String tableName) throws TimeoutException {
- StreamingQuery streamingQuery = df.writeStream()
+ private static final String MEMORY_TABLE = "_stream_view_mem";
+
+ private StreamingQuery startStream(Map<String, String> options) throws TimeoutException {
+ return spark.readStream()
+ .options(options)
+ .format("iceberg")
+ .load(tableName)
+ .writeStream()
+ .options(options)
.format("memory")
- .queryName(tableName)
+ .queryName(MEMORY_TABLE)
.outputMode(OutputMode.Append())
.start();
- streamingQuery.processAllAvailable();
- return spark.sql("select * from " + tableName)
+ }
+
+ private StreamingQuery startStream() throws TimeoutException {
+ return startStream(Collections.emptyMap());
+ }
+
+ private StreamingQuery startStream(String key, String value) throws TimeoutException {
+ return startStream(ImmutableMap.of(key, value));
+ }
+
+ private List<SimpleRecord> rowsAvailable(StreamingQuery query) {
+ query.processAllAvailable();
+ return spark.sql("select * from " + MEMORY_TABLE)
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
}
- private static List<SimpleRecord> processAvailable(Dataset<Row> df) throws TimeoutException {
- return processAvailable(df, "test12");
- }
}
diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index d5bb1a1..145cf78 100644
--- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -24,14 +24,13 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
-import org.apache.hadoop.conf.Configuration;
+import java.util.stream.IntStream;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
-import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
@@ -40,19 +39,17 @@ import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.spark.SparkReadOptions;
-import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
-import org.apache.spark.sql.streaming.StreamingQueryException;
-import org.apache.spark.sql.streaming.Trigger;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
@@ -62,7 +59,6 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static org.apache.iceberg.expressions.Expressions.ref;
-import static org.apache.iceberg.types.Types.NestedField.optional;
@RunWith(Parameterized.class)
public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
@@ -72,13 +68,6 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
}
private Table table;
- private String tableIdentifier;
-
- private static final Configuration CONF = new Configuration();
- private static final Schema SCHEMA = new Schema(
- optional(1, "id", Types.IntegerType.get()),
- optional(2, "data", Types.StringType.get())
- );
/**
* test data to be used by multiple writes
@@ -135,7 +124,6 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
"USING iceberg " +
"PARTITIONED BY (bucket(3, id))", tableName);
this.table = validationCatalog.loadTable(tableIdent);
- this.tableIdentifier = tableName;
}
@After
@@ -150,19 +138,26 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
sql("DROP TABLE IF EXISTS %s", tableName);
}
- @SuppressWarnings("unchecked")
@Test
public void testReadStreamOnIcebergTableWithMultipleSnapshots() throws Exception {
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(expected, tableIdentifier);
+ appendDataAsMultipleSnapshots(expected);
- table.refresh();
+ StreamingQuery query = startStream();
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .load(tableIdentifier);
- List<SimpleRecord> actual = processAvailable(df);
+ List<SimpleRecord> actual = rowsAvailable(query);
+ Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+ }
+
+ @Test
+ public void testReadStreamOnIcebergThenAddData() throws Exception {
+ List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+
+ StreamingQuery query = startStream();
+ appendDataAsMultipleSnapshots(expected);
+
+ List<SimpleRecord> actual = rowsAvailable(query);
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
@@ -172,76 +167,93 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
new SimpleRecord(-2, "minustwo"),
new SimpleRecord(-1, "minusone"),
new SimpleRecord(0, "zero"));
- appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
+
+ appendData(dataBeforeTimestamp);
table.refresh();
long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
- waitUntilAfter(streamStartTimestamp);
- List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(expected, tableIdentifier);
+ StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp));
- table.refresh();
+ List<SimpleRecord> empty = rowsAvailable(query);
+ Assertions.assertThat(empty.isEmpty()).isTrue();
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
- .load(tableIdentifier);
- List<SimpleRecord> actual = processAvailable(df);
+ List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+ appendDataAsMultipleSnapshots(expected);
+
+ List<SimpleRecord> actual = rowsAvailable(query);
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
@Test
- public void testReadingStreamAfterLatestTimestamp() throws Exception {
- List<SimpleRecord> dataBeforeTimestamp = Lists.newArrayList(
+ public void testReadingStreamFromFutureTimetsamp() throws Exception {
+ long futureTimestamp = System.currentTimeMillis() + 10000;
+
+ StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(futureTimestamp));
+
+ List<SimpleRecord> actual = rowsAvailable(query);
+ Assertions.assertThat(actual.isEmpty()).isTrue();
+
+ List<SimpleRecord> data = Lists.newArrayList(
new SimpleRecord(-2, "minustwo"),
new SimpleRecord(-1, "minusone"),
new SimpleRecord(0, "zero"));
- appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
- table.refresh();
- long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
- waitUntilAfter(streamStartTimestamp);
+ // Perform several inserts that should not show up because the fromTimestamp has not elapsed
+ IntStream.range(0, 3).forEach(x -> {
+ appendData(data);
+ Assertions.assertThat(rowsAvailable(query).isEmpty()).isTrue();
+ });
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
- .load(tableIdentifier);
- List<SimpleRecord> actual = processAvailable(df);
- Assertions.assertThat(actual.isEmpty()).isTrue();
+ waitUntilAfter(futureTimestamp);
+
+ // Data appended after the timestamp should appear
+ appendData(data);
+ actual = rowsAvailable(query);
+ Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(data);
}
@Test
- public void testReadingStreamFromTimestampStartWithExistingTimestamp() throws Exception {
+ public void testReadingStreamFromTimestampFutureWithExistingSnapshots() throws Exception {
List<SimpleRecord> dataBeforeTimestamp = Lists.newArrayList(
- new SimpleRecord(-2, "minustwo"),
- new SimpleRecord(-1, "minusone"),
- new SimpleRecord(0, "zero"));
- appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
+ new SimpleRecord(1, "one"),
+ new SimpleRecord(2, "two"),
+ new SimpleRecord(3, "three"));
+ appendData(dataBeforeTimestamp);
- table.refresh();
+ long streamStartTimestamp = System.currentTimeMillis() + 2000;
+
+ // Start the stream with a future timestamp after the current snapshot
+ StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp));
+ List<SimpleRecord> actual = rowsAvailable(query);
+ Assert.assertEquals(Collections.emptyList(), actual);
+
+ // Stream should contain data added after the timestamp elapses
+ waitUntilAfter(streamStartTimestamp);
+ List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
+ appendDataAsMultipleSnapshots(expected);
+ Assertions.assertThat(rowsAvailable(query)).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
+ }
+
+ @Test
+ public void testReadingStreamFromTimestampOfExistingSnapshot() throws Exception {
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
- // Append the first expected data
- appendData(expected.get(0), tableIdentifier, "parquet");
+ // Create an existing snapshot with some data
+ appendData(expected.get(0));
table.refresh();
- long streamStartTimestamp = table.currentSnapshot().timestampMillis();
+ long firstSnapshotTime = table.currentSnapshot().timestampMillis();
- // Start stream
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
- .load(tableIdentifier);
+ // Start stream giving the first Snapshot's time as the start point
+ StreamingQuery stream = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(firstSnapshotTime));
// Append rest of expected data
for (int i = 1; i < expected.size(); i++) {
- appendData(expected.get(i), tableIdentifier, "parquet");
+ appendData(expected.get(i));
}
- table.refresh();
- List<SimpleRecord> actual = processAvailable(df);
-
+ List<SimpleRecord> actual = rowsAvailable(stream);
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
@@ -256,97 +268,65 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
List<SimpleRecord> thirdSnapshotRecordList = Lists.newArrayList(
new SimpleRecord(3, "three"));
- List<SimpleRecord> expectedRecordList = Lists.newArrayList(
- new SimpleRecord(2, "two"),
- new SimpleRecord(3, "three"));
+ List<SimpleRecord> expectedRecordList = Lists.newArrayList();
+ expectedRecordList.addAll(secondSnapshotRecordList);
+ expectedRecordList.addAll(thirdSnapshotRecordList);
- appendData(firstSnapshotRecordList, tableIdentifier, "parquet");
+ appendData(firstSnapshotRecordList);
table.refresh();
- Snapshot firstSnapshot = table.currentSnapshot();
-
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(firstSnapshot.timestampMillis()))
- .load(tableIdentifier);
+ long firstSnapshotid = table.currentSnapshot().snapshotId();
+ long firstSnapshotCommitTime = table.currentSnapshot().timestampMillis();
- appendData(secondSnapshotRecordList, tableIdentifier, "parquet");
- table.refresh();
- appendData(thirdSnapshotRecordList, tableIdentifier, "parquet");
- table.refresh();
+ appendData(secondSnapshotRecordList);
+ appendData(thirdSnapshotRecordList);
- table.expireSnapshots().expireSnapshotId(firstSnapshot.snapshotId()).commit();
- table.refresh();
+ table.expireSnapshots().expireSnapshotId(firstSnapshotid).commit();
- List<SimpleRecord> actual = processAvailable(df);
- Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedRecordList));
+ StreamingQuery query = startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, String.valueOf(firstSnapshotCommitTime));
+ List<SimpleRecord> actual = rowsAvailable(query);
+ Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(expectedRecordList);
}
@Test
- public void testReadingStreamFromTimestampGreaterThanLatestSnapshotTime() throws Exception {
- List<SimpleRecord> dataBeforeTimestamp = Lists.newArrayList(
- new SimpleRecord(1, "one"),
- new SimpleRecord(2, "two"),
- new SimpleRecord(3, "three"));
- appendData(dataBeforeTimestamp, tableIdentifier, "parquet");
-
- table.refresh();
- long streamStartTimestamp = table.currentSnapshot().timestampMillis() + 1;
- waitUntilAfter(streamStartTimestamp);
-
- // Test stream with Timestamp > Latest Snapshot Time
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .option(SparkReadOptions.STREAM_FROM_TIMESTAMP, Long.toString(streamStartTimestamp))
- .load(tableIdentifier);
- List<SimpleRecord> actual = processAvailable(df);
- Assert.assertEquals(Collections.emptyList(), actual);
-
- // Test stream after new data is added
- List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(expected, tableIdentifier);
- table.refresh();
- Assertions.assertThat(processAvailable(df, "newdata"))
- .containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
- }
-
- @SuppressWarnings("unchecked")
- @Test
public void testResumingStreamReadFromCheckpoint() throws Exception {
File writerCheckpointFolder = temp.newFolder("writer-checkpoint-folder");
File writerCheckpoint = new File(writerCheckpointFolder, "writer-checkpoint");
- final String tempView = "microBatchView";
+ File output = temp.newFolder();
- Dataset<Row> df = spark.readStream()
+ DataStreamWriter querySource = spark.readStream()
.format("iceberg")
- .load(tableIdentifier);
-
- // Trigger.Once with the combination of StreamingQuery.awaitTermination, which succeeds after this code
- // will result in stopping the stream.
- // This is how Stream STOP and RESUME is simulated in this Test Case.
- DataStreamWriter<Row> singleBatchWriter = df.writeStream()
- .trigger(Trigger.Once())
+ .load(tableName)
+ .writeStream()
.option("checkpointLocation", writerCheckpoint.toString())
- .foreachBatch((batchDF, batchId) -> {
- batchDF.createOrReplaceGlobalTempView(tempView);
- });
-
- String globalTempView = "global_temp." + tempView;
+ .format("parquet")
+ .queryName("checkpoint_test")
+ .option("path", output.getPath());
- List<SimpleRecord> processStreamOnEmptyIcebergTable = processMicroBatch(singleBatchWriter, globalTempView);
- Assert.assertEquals(Collections.emptyList(), processStreamOnEmptyIcebergTable);
+ StreamingQuery startQuery = querySource.start();
+ startQuery.processAllAvailable();
+ startQuery.stop();
+ List<SimpleRecord> expected = Lists.newArrayList();
for (List<List<SimpleRecord>> expectedCheckpoint : TEST_DATA_MULTIPLE_WRITES_MULTIPLE_SNAPSHOTS) {
- appendDataAsMultipleSnapshots(expectedCheckpoint, tableIdentifier);
- table.refresh();
-
- List<SimpleRecord> actualDataInCurrentMicroBatch = processMicroBatch(singleBatchWriter, globalTempView);
- Assertions.assertThat(actualDataInCurrentMicroBatch)
- .containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedCheckpoint));
+ // New data was added while the stream was down
+ appendDataAsMultipleSnapshots(expectedCheckpoint);
+ expected.addAll(Lists.newArrayList(Iterables.concat(Iterables.concat(expectedCheckpoint))));
+
+ // Stream starts up again from checkpoint read the newly added data and shut down
+ StreamingQuery restartedQuery = querySource.start();
+ restartedQuery.processAllAvailable();
+ restartedQuery.stop();
+
+ // Read data added by the stream
+ List<SimpleRecord> actual = spark.read()
+ .load(output.getPath())
+ .as(Encoders.bean(SimpleRecord.class))
+ .collectAsList();
+ Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
}
- @SuppressWarnings("unchecked")
@Test
public void testParquetOrcAvroDataInOneTable() throws Exception {
List<SimpleRecord> parquetFileRecords = Lists.newArrayList(
@@ -362,33 +342,24 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
new SimpleRecord(6, "six"),
new SimpleRecord(7, "seven"));
- appendData(parquetFileRecords, tableIdentifier, "parquet");
- appendData(orcFileRecords, tableIdentifier, "orc");
- appendData(avroFileRecords, tableIdentifier, "avro");
+ appendData(parquetFileRecords);
+ appendData(orcFileRecords, "orc");
+ appendData(avroFileRecords, "avro");
table.refresh();
- Dataset<Row> ds = spark.readStream()
- .format("iceberg")
- .load(tableIdentifier);
- Assertions.assertThat(processAvailable(ds))
+ StreamingQuery query = startStream();
+ Assertions.assertThat(rowsAvailable(query))
.containsExactlyInAnyOrderElementsOf(Iterables.concat(parquetFileRecords, orcFileRecords, avroFileRecords));
}
- @SuppressWarnings("unchecked")
@Test
public void testReadStreamFromEmptyTable() throws Exception {
- table.refresh();
-
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .load(tableIdentifier);
-
- List<SimpleRecord> actual = processAvailable(df);
+ StreamingQuery stream = startStream();
+ List<SimpleRecord> actual = rowsAvailable(stream);
Assert.assertEquals(Collections.emptyList(), actual);
}
- @SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeOverwriteErrorsOut() throws Exception {
// upgrade table to version 2 - to facilitate creation of Snapshot of type OVERWRITE.
@@ -398,7 +369,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
// fill table with some initial data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);
+ appendDataAsMultipleSnapshots(dataAcrossSnapshots);
Schema deleteRowSchema = table.schema().select("data");
Record dataDelete = GenericRecord.create(deleteRowSchema);
@@ -416,29 +387,21 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
// check pre-condition - that the above Delete file write - actually resulted in snapshot of type OVERWRITE
Assert.assertEquals(DataOperations.OVERWRITE, table.currentSnapshot().operation());
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .load(tableIdentifier);
- StreamingQuery streamingQuery = df.writeStream()
- .format("memory")
- .queryName("testtablewithoverwrites")
- .outputMode(OutputMode.Append())
- .start();
+ StreamingQuery query = startStream();
AssertHelpers.assertThrowsCause(
"Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND",
IllegalStateException.class,
"Cannot process overwrite snapshot",
- () -> streamingQuery.processAllAvailable()
+ () -> query.processAllAvailable()
);
}
- @SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeReplaceIgnoresReplace() throws Exception {
// fill table with some data
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(expected, tableIdentifier);
+ appendDataAsMultipleSnapshots(expected);
table.refresh();
@@ -450,15 +413,11 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
// check pre-condition
Assert.assertEquals(DataOperations.REPLACE, table.currentSnapshot().operation());
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .load(tableIdentifier);
-
- List<SimpleRecord> actual = processAvailable(df);
+ StreamingQuery query = startStream();
+ List<SimpleRecord> actual = rowsAvailable(query);
Assertions.assertThat(actual).containsExactlyInAnyOrderElementsOf(Iterables.concat(expected));
}
- @SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeDeleteErrorsOut() throws Exception {
table.updateSpec()
@@ -470,7 +429,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
// fill table with some data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);
+ appendDataAsMultipleSnapshots(dataAcrossSnapshots);
table.refresh();
@@ -483,24 +442,16 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
table.refresh();
Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation());
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .load(tableIdentifier);
- StreamingQuery streamingQuery = df.writeStream()
- .format("memory")
- .queryName("testtablewithdelete")
- .outputMode(OutputMode.Append())
- .start();
+ StreamingQuery query = startStream();
AssertHelpers.assertThrowsCause(
"Streaming should fail with IllegalStateException, as the snapshot is not of type APPEND",
IllegalStateException.class,
"Cannot process delete snapshot",
- () -> streamingQuery.processAllAvailable()
+ () -> query.processAllAvailable()
);
}
- @SuppressWarnings("unchecked")
@Test
public void testReadStreamWithSnapshotTypeDeleteAndSkipDeleteOption() throws Exception {
table.updateSpec()
@@ -512,7 +463,7 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
// fill table with some data
List<List<SimpleRecord>> dataAcrossSnapshots = TEST_DATA_MULTIPLE_SNAPSHOTS;
- appendDataAsMultipleSnapshots(dataAcrossSnapshots, tableIdentifier);
+ appendDataAsMultipleSnapshots(dataAcrossSnapshots);
table.refresh();
@@ -525,56 +476,62 @@ public final class TestStructuredStreamingRead3 extends SparkCatalogTestBase {
table.refresh();
Assert.assertEquals(DataOperations.DELETE, table.currentSnapshot().operation());
- Dataset<Row> df = spark.readStream()
- .format("iceberg")
- .option(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true")
- .load(tableIdentifier);
- Assertions.assertThat(processAvailable(df))
+ StreamingQuery query = startStream(SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS, "true");
+ Assertions.assertThat(rowsAvailable(query))
.containsExactlyInAnyOrderElementsOf(Iterables.concat(dataAcrossSnapshots));
}
- private static List<SimpleRecord> processMicroBatch(DataStreamWriter<Row> singleBatchWriter, String viewName)
- throws TimeoutException, StreamingQueryException {
- StreamingQuery streamingQuery = singleBatchWriter.start();
- streamingQuery.awaitTermination();
-
- return spark.sql(String.format("select * from %s", viewName))
- .as(Encoders.bean(SimpleRecord.class))
- .collectAsList();
- }
-
/**
* appends each list as a Snapshot on the iceberg table at the given location.
* accepts a list of lists - each list representing data per snapshot.
*/
- private static void appendDataAsMultipleSnapshots(List<List<SimpleRecord>> data, String tableIdentifier) {
+ private void appendDataAsMultipleSnapshots(List<List<SimpleRecord>> data) {
for (List<SimpleRecord> l : data) {
- appendData(l, tableIdentifier, "parquet");
+ appendData(l);
}
}
- private static void appendData(List<SimpleRecord> data, String tableIdentifier, String fileFormat) {
+ private void appendData(List<SimpleRecord> data) {
+ appendData(data, "parquet");
+ }
+
+ private void appendData(List<SimpleRecord> data, String format) {
Dataset<Row> df = spark.createDataFrame(data, SimpleRecord.class);
df.select("id", "data").write()
.format("iceberg")
- .option("write-format", fileFormat)
+ .option("write-format", format)
.mode("append")
- .save(tableIdentifier);
+ .save(tableName);
}
- private static List<SimpleRecord> processAvailable(Dataset<Row> df, String tableName) throws TimeoutException {
- StreamingQuery streamingQuery = df.writeStream()
+ private static final String MEMORY_TABLE = "_stream_view_mem";
+
+ private StreamingQuery startStream(Map<String, String> options) throws TimeoutException {
+ return spark.readStream()
+ .options(options)
+ .format("iceberg")
+ .load(tableName)
+ .writeStream()
+ .options(options)
.format("memory")
- .queryName(tableName)
+ .queryName(MEMORY_TABLE)
.outputMode(OutputMode.Append())
.start();
- streamingQuery.processAllAvailable();
- return spark.sql("select * from " + tableName)
+ }
+
+ private StreamingQuery startStream() throws TimeoutException {
+ return startStream(Collections.emptyMap());
+ }
+
+ private StreamingQuery startStream(String key, String value) throws TimeoutException {
+ return startStream(ImmutableMap.of(key, value));
+ }
+
+ private List<SimpleRecord> rowsAvailable(StreamingQuery query) {
+ query.processAllAvailable();
+ return spark.sql("select * from " + MEMORY_TABLE)
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
}
- private static List<SimpleRecord> processAvailable(Dataset<Row> df) throws TimeoutException {
- return processAvailable(df, "test12");
- }
}