You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/21 23:24:41 UTC
[iceberg] branch master updated: Spark 2.4: Use snapshot schema
when time traveling (#1508)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new ffee32c Spark 2.4: Use snapshot schema when time traveling (#1508)
ffee32c is described below
commit ffee32c48859eef2136f728438b1da613429fe7e
Author: Wing Yew Poon <wy...@cloudera.com>
AuthorDate: Thu Oct 21 16:24:29 2021 -0700
Spark 2.4: Use snapshot schema when time traveling (#1508)
---
.../java/org/apache/iceberg/BaseTableScan.java | 28 +--
.../java/org/apache/iceberg/DataTableScan.java | 10 +
.../java/org/apache/iceberg/util/DateTimeUtil.java | 7 +
.../java/org/apache/iceberg/util/SnapshotUtil.java | 48 +++++
.../apache/iceberg/spark/source/IcebergSource.java | 4 +-
.../org/apache/iceberg/spark/source/Reader.java | 148 ++++++-------
.../spark/source/TestIcebergSourceHiveTables.java | 13 +-
.../spark/source/TestIcebergSourceTablesBase.java | 233 ++++++++++++++++++++-
.../spark/source/TestIcebergSourceHiveTables.java | 13 +-
.../spark/source/TestIcebergSourceTablesBase.java | 3 +-
10 files changed, 399 insertions(+), 108 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index e890681..f942c51 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -19,10 +19,6 @@
package org.apache.iceberg;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
@@ -37,6 +33,8 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,8 +45,6 @@ import org.slf4j.LoggerFactory;
abstract class BaseTableScan implements TableScan {
private static final Logger LOG = LoggerFactory.getLogger(BaseTableScan.class);
- private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
-
private final TableOperations ops;
private final Table table;
private final Schema schema;
@@ -132,19 +128,7 @@ abstract class BaseTableScan implements TableScan {
Preconditions.checkArgument(context.snapshotId() == null,
"Cannot override snapshot, already set to id=%s", context.snapshotId());
- Long lastSnapshotId = null;
- for (HistoryEntry logEntry : ops.current().snapshotLog()) {
- if (logEntry.timestampMillis() <= timestampMillis) {
- lastSnapshotId = logEntry.snapshotId();
- }
- }
-
- // the snapshot ID could be null if no entries were older than the requested time. in that case,
- // there is no valid snapshot to read.
- Preconditions.checkArgument(lastSnapshotId != null,
- "Cannot find a snapshot older than %s", formatTimestampMillis(timestampMillis));
-
- return useSnapshot(lastSnapshotId);
+ return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis));
}
@Override
@@ -199,7 +183,7 @@ abstract class BaseTableScan implements TableScan {
Snapshot snapshot = snapshot();
if (snapshot != null) {
LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table,
- snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()),
+ snapshot.snapshotId(), DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()),
context.rowFilter());
Listeners.notifyAll(
@@ -304,8 +288,4 @@ abstract class BaseTableScan implements TableScan {
return schema;
}
-
- private static String formatTimestampMillis(long millis) {
- return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC));
- }
}
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index fdcd5ed..58a7c9b 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -23,6 +23,7 @@ import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.ThreadPools;
public class DataTableScan extends BaseTableScan {
@@ -63,6 +64,15 @@ public class DataTableScan extends BaseTableScan {
}
@Override
+ public TableScan useSnapshot(long scanSnapshotId) {
+ // call method in superclass just for the side effect of argument validation;
+ // we do not use its return value
+ super.useSnapshot(scanSnapshotId);
+ Schema snapshotSchema = SnapshotUtil.schemaFor(table(), scanSnapshotId);
+ return newRefinedScan(tableOps(), table(), snapshotSchema, context().useSnapshotId(scanSnapshotId));
+ }
+
+ @Override
protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
return new DataTableScan(ops, table, schema, context);
}
diff --git a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
index 2dcaa9f..6b115b0 100644
--- a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
@@ -25,12 +25,15 @@ import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
public class DateTimeUtil {
private DateTimeUtil() {
}
+ private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
public static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
public static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
public static final long MICROS_PER_MILLIS = 1000L;
@@ -81,4 +84,8 @@ public class DateTimeUtil {
public static long microsFromTimestamptz(OffsetDateTime dateTime) {
return ChronoUnit.MICROS.between(EPOCH, dateTime);
}
+
+ public static String formatTimestampMillis(long millis) {
+ return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC));
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
index beafcc2..b2efe29 100644
--- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
@@ -22,6 +22,8 @@ package org.apache.iceberg.util;
import java.util.List;
import java.util.function.Function;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.HistoryEntry;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.exceptions.ValidationException;
@@ -144,4 +146,50 @@ public class SnapshotUtil {
throw new IllegalStateException(
String.format("Cannot find snapshot after %s: not an ancestor of table's current snapshot", snapshotId));
}
+
+ /**
+ * Returns the ID of the most recent snapshot for the table as of the timestamp.
+ *
+ * @param table a {@link Table}
+ * @param timestampMillis the timestamp in millis since the Unix epoch
+ * @return the snapshot ID
+ * @throws IllegalArgumentException when no snapshot is found in the table
+ * older than the timestamp
+ */
+ public static long snapshotIdAsOfTime(Table table, long timestampMillis) {
+ Long snapshotId = null;
+ for (HistoryEntry logEntry : table.history()) {
+ if (logEntry.timestampMillis() <= timestampMillis) {
+ snapshotId = logEntry.snapshotId();
+ }
+ }
+
+ Preconditions.checkArgument(snapshotId != null,
+ "Cannot find a snapshot older than %s", DateTimeUtil.formatTimestampMillis(timestampMillis));
+ return snapshotId;
+ }
+
+ /**
+ * Returns the schema of the table for the specified snapshot.
+ *
+ * @param table a {@link Table}
+ * @param snapshotId the ID of the snapshot
+ * @return the schema
+ */
+ public static Schema schemaFor(Table table, long snapshotId) {
+ Snapshot snapshot = table.snapshot(snapshotId);
+ Preconditions.checkArgument(snapshot != null, "Cannot find snapshot with ID %s", snapshotId);
+ Integer schemaId = snapshot.schemaId();
+
+ // schemaId could be null, if snapshot was created before Iceberg added schema id to snapshot
+ if (schemaId != null) {
+ Schema schema = table.schemas().get(schemaId);
+ Preconditions.checkState(schema != null,
+ "Cannot find schema with schema id %s", schemaId);
+ return schema;
+ }
+
+ // TODO: recover the schema by reading previous metadata files
+ return table.schema();
+ }
}
diff --git a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index f0bc4ee..8976d66 100644
--- a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -68,8 +68,8 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
Reader reader = new Reader(lazySparkSession(), table, Boolean.parseBoolean(caseSensitive), options);
if (readSchema != null) {
- // convert() will fail if readSchema contains fields not in table.schema()
- SparkSchemaUtil.convert(table.schema(), readSchema);
+ // convert() will fail if readSchema contains fields not in reader.snapshotSchema()
+ SparkSchemaUtil.convert(reader.snapshotSchema(), readSchema);
reader.pruneColumns(readSchema);
}
diff --git a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
index a8a2b2f..2ac570d 100644
--- a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -86,14 +86,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
private final JavaSparkContext sparkContext;
private final Table table;
private final SparkReadConf readConf;
- private final Long snapshotId;
- private final Long startSnapshotId;
- private final Long endSnapshotId;
- private final Long asOfTimestamp;
- private final Long splitSize;
- private final Integer splitLookback;
- private final Long splitOpenFileCost;
- private final boolean caseSensitive;
+ private final TableScan baseScan;
private StructType requestedSchema = null;
private List<Expression> filterExpressions = null;
private Filter[] pushedFilters = NO_FILTERS;
@@ -111,31 +104,9 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
this.table = table;
this.readConf = new SparkReadConf(spark, table, options.asMap());
- this.snapshotId = readConf.snapshotId();
- this.asOfTimestamp = readConf.asOfTimestamp();
- if (snapshotId != null && asOfTimestamp != null) {
- throw new IllegalArgumentException(
- "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
- }
-
- this.startSnapshotId = readConf.startSnapshotId();
- this.endSnapshotId = readConf.endSnapshotId();
- if (snapshotId != null || asOfTimestamp != null) {
- if (startSnapshotId != null || endSnapshotId != null) {
- throw new IllegalArgumentException(
- "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or " +
- "as-of-timestamp is specified");
- }
- } else {
- if (startSnapshotId == null && endSnapshotId != null) {
- throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan");
- }
- }
- // look for split behavior overrides in options
- this.splitSize = options.get(SparkReadOptions.SPLIT_SIZE).map(Long::parseLong).orElse(null);
- this.splitLookback = options.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null);
- this.splitOpenFileCost = options.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null);
+ this.baseScan = configureBaseScan(caseSensitive, options);
+ this.schema = baseScan.schema();
if (table.io() instanceof HadoopFileIO) {
String fsscheme = "no_exist";
@@ -157,18 +128,84 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
this.localityPreferred = false;
}
- this.schema = table.schema();
- this.caseSensitive = caseSensitive;
this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
}
+ private void validateOptions(
+ Long snapshotId, Long asOfTimestamp, Long startSnapshotId, Long endSnapshotId) {
+ if (snapshotId != null && asOfTimestamp != null) {
+ throw new IllegalArgumentException(
+ "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
+ }
+
+ if ((snapshotId != null || asOfTimestamp != null) &&
+ (startSnapshotId != null || endSnapshotId != null)) {
+ throw new IllegalArgumentException(
+ "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or " +
+ "as-of-timestamp is specified");
+ }
+
+ if (startSnapshotId == null && endSnapshotId != null) {
+ throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan");
+ }
+ }
+
+ private TableScan configureBaseScan(boolean caseSensitive, DataSourceOptions options) {
+ Long snapshotId = readConf.snapshotId();
+ Long asOfTimestamp = readConf.asOfTimestamp();
+ Long startSnapshotId = readConf.startSnapshotId();
+ Long endSnapshotId = readConf.endSnapshotId();
+ validateOptions(snapshotId, asOfTimestamp, startSnapshotId, endSnapshotId);
+
+ TableScan scan = table.newScan().caseSensitive(caseSensitive);
+
+ if (snapshotId != null) {
+ scan = scan.useSnapshot(snapshotId);
+ }
+
+ if (asOfTimestamp != null) {
+ scan = scan.asOfTime(asOfTimestamp);
+ }
+
+ if (startSnapshotId != null) {
+ if (endSnapshotId != null) {
+ scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
+ } else {
+ scan = scan.appendsAfter(startSnapshotId);
+ }
+ }
+
+ // look for split behavior overrides in options
+ Long splitSize = options.get(SparkReadOptions.SPLIT_SIZE).map(Long::parseLong).orElse(null);
+ if (splitSize != null) {
+ scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
+ }
+
+ Integer splitLookback = options.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null);
+ if (splitLookback != null) {
+ scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString());
+ }
+
+ Long splitOpenFileCost = options.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null);
+ if (splitOpenFileCost != null) {
+ scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString());
+ }
+
+ return scan;
+ }
+
+ protected Schema snapshotSchema() {
+ return baseScan.schema();
+ }
+
private Schema lazySchema() {
if (schema == null) {
if (requestedSchema != null) {
// the projection should include all columns that will be returned, including those only used in filters
- this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema, filterExpression(), caseSensitive);
+ this.schema = SparkSchemaUtil.prune(
+ baseScan.schema(), requestedSchema, filterExpression(), baseScan.isCaseSensitive());
} else {
- this.schema = table.schema();
+ this.schema = baseScan.schema();
}
}
return schema;
@@ -211,6 +248,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
List<CombinedScanTask> scanTasks = tasks();
+ boolean caseSensitive = baseScan.isCaseSensitive();
InputPartition<ColumnarBatch>[] readTasks = new InputPartition[scanTasks.size()];
Tasks.range(readTasks.length)
@@ -235,6 +273,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
List<CombinedScanTask> scanTasks = tasks();
+ boolean caseSensitive = baseScan.isCaseSensitive();
InputPartition<InternalRow>[] readTasks = new InputPartition[scanTasks.size()];
Tasks.range(readTasks.length)
@@ -378,38 +417,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
private List<CombinedScanTask> tasks() {
if (tasks == null) {
- TableScan scan = table
- .newScan()
- .caseSensitive(caseSensitive)
- .project(lazySchema());
-
- if (snapshotId != null) {
- scan = scan.useSnapshot(snapshotId);
- }
-
- if (asOfTimestamp != null) {
- scan = scan.asOfTime(asOfTimestamp);
- }
-
- if (startSnapshotId != null) {
- if (endSnapshotId != null) {
- scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
- } else {
- scan = scan.appendsAfter(startSnapshotId);
- }
- }
-
- if (splitSize != null) {
- scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
- }
-
- if (splitLookback != null) {
- scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString());
- }
-
- if (splitOpenFileCost != null) {
- scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString());
- }
+ TableScan scan = baseScan.project(lazySchema());
if (filterExpressions != null) {
for (Expression filter : filterExpressions) {
@@ -430,8 +438,8 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
@Override
public String toString() {
return String.format(
- "IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s, batchedReads=%s)",
- table, lazySchema().asStruct(), filterExpressions, caseSensitive, enableBatchRead());
+ "IcebergScan(table=%s, type=%s, filters=%s, batchedReads=%s)",
+ table, lazySchema().asStruct(), filterExpressions, enableBatchRead());
}
private static class ReadTask<T> implements Serializable, InputPartition<T> {
diff --git a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
index a51f9ee..76923d4 100644
--- a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
+++ b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
@@ -44,11 +44,14 @@ public class TestIcebergSourceHiveTables extends TestIcebergSourceTablesBase {
@After
public void dropTable() throws IOException {
- Table table = catalog.loadTable(currentIdentifier);
- Path tablePath = new Path(table.location());
- FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf());
- fs.delete(tablePath, true);
- catalog.dropTable(currentIdentifier, false);
+ if (currentIdentifier != null) {
+ Table table = catalog.loadTable(currentIdentifier);
+ Path tablePath = new Path(table.location());
+ FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf());
+ fs.delete(tablePath, true);
+ catalog.dropTable(currentIdentifier, false);
+ currentIdentifier = null;
+ }
}
@Override
diff --git a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 10b9d6f..93a3bf1 100644
--- a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
@@ -52,7 +53,9 @@ import org.apache.spark.SparkException;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.types.StructType;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -68,6 +71,17 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
optional(2, "data", Types.StringType.get())
);
+ private static final Schema SCHEMA2 = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get()),
+ optional(3, "category", Types.StringType.get())
+ );
+
+ private static final Schema SCHEMA3 = new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(3, "category", Types.StringType.get())
+ );
+
private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("id").build();
@Rule
@@ -1111,7 +1125,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
// check time travel
List<Row> actualAfterFirstCommit = spark.read()
.format("iceberg")
- .option("snapshot-id", String.valueOf(firstCommitId))
+ .option(SparkReadOptions.SNAPSHOT_ID, String.valueOf(firstCommitId))
.load(loadLocation(tableIdentifier, "partitions"))
.orderBy("partition.id")
.collectAsList();
@@ -1140,6 +1154,223 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
}
@Test
+ public synchronized void testSnapshotReadAfterAddColumn() {
+ TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
+ Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+
+ List<Row> originalRecords = Lists.newArrayList(
+ RowFactory.create(1, "x"),
+ RowFactory.create(2, "y"),
+ RowFactory.create(3, "z"));
+
+ StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA);
+ Dataset<Row> inputDf = spark.createDataFrame(originalRecords, originalSparkSchema);
+ inputDf.select("id", "data").write()
+ .format("iceberg")
+ .mode(SaveMode.Append)
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+
+ Dataset<Row> resultDf = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier));
+ Assert.assertEquals("Records should match", originalRecords,
+ resultDf.orderBy("id").collectAsList());
+
+ Snapshot snapshot1 = table.currentSnapshot();
+
+ table.updateSchema().addColumn("category", Types.StringType.get()).commit();
+
+ List<Row> newRecords = Lists.newArrayList(
+ RowFactory.create(4, "xy", "B"),
+ RowFactory.create(5, "xyz", "C"));
+
+ StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA2);
+ Dataset<Row> inputDf2 = spark.createDataFrame(newRecords, newSparkSchema);
+ inputDf2.select("id", "data", "category").write()
+ .format("iceberg")
+ .mode(SaveMode.Append)
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+
+ List<Row> updatedRecords = Lists.newArrayList(
+ RowFactory.create(1, "x", null),
+ RowFactory.create(2, "y", null),
+ RowFactory.create(3, "z", null),
+ RowFactory.create(4, "xy", "B"),
+ RowFactory.create(5, "xyz", "C"));
+
+ Dataset<Row> resultDf2 = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier));
+ Assert.assertEquals("Records should match", updatedRecords,
+ resultDf2.orderBy("id").collectAsList());
+
+ Dataset<Row> resultDf3 = spark.read()
+ .format("iceberg")
+ .option(SparkReadOptions.SNAPSHOT_ID, snapshot1.snapshotId())
+ .load(loadLocation(tableIdentifier));
+ Assert.assertEquals("Records should match", originalRecords,
+ resultDf3.orderBy("id").collectAsList());
+ Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf3.schema());
+ }
+
+ @Test
+ public synchronized void testSnapshotReadAfterDropColumn() {
+ TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
+ Table table = createTable(tableIdentifier, SCHEMA2, PartitionSpec.unpartitioned());
+
+ List<Row> originalRecords = Lists.newArrayList(
+ RowFactory.create(1, "x", "A"),
+ RowFactory.create(2, "y", "A"),
+ RowFactory.create(3, "z", "B"));
+
+ StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA2);
+ Dataset<Row> inputDf = spark.createDataFrame(originalRecords, originalSparkSchema);
+ inputDf.select("id", "data", "category").write()
+ .format("iceberg")
+ .mode(SaveMode.Append)
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+
+ Dataset<Row> resultDf = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier));
+ Assert.assertEquals("Records should match", originalRecords,
+ resultDf.orderBy("id").collectAsList());
+
+ long tsBeforeDropColumn = waitUntilAfter(System.currentTimeMillis());
+ table.updateSchema().deleteColumn("data").commit();
+ long tsAfterDropColumn = waitUntilAfter(System.currentTimeMillis());
+
+ List<Row> newRecords = Lists.newArrayList(
+ RowFactory.create(4, "B"),
+ RowFactory.create(5, "C"));
+
+ StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA3);
+ Dataset<Row> inputDf2 = spark.createDataFrame(newRecords, newSparkSchema);
+ inputDf2.select("id", "category").write()
+ .format("iceberg")
+ .mode(SaveMode.Append)
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+
+ List<Row> updatedRecords = Lists.newArrayList(
+ RowFactory.create(1, "A"),
+ RowFactory.create(2, "A"),
+ RowFactory.create(3, "B"),
+ RowFactory.create(4, "B"),
+ RowFactory.create(5, "C"));
+
+ Dataset<Row> resultDf2 = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier));
+ Assert.assertEquals("Records should match", updatedRecords,
+ resultDf2.orderBy("id").collectAsList());
+
+ Dataset<Row> resultDf3 = spark.read()
+ .format("iceberg")
+ .option(SparkReadOptions.AS_OF_TIMESTAMP, tsBeforeDropColumn)
+ .load(loadLocation(tableIdentifier));
+ Assert.assertEquals("Records should match", originalRecords,
+ resultDf3.orderBy("id").collectAsList());
+ Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf3.schema());
+
+ // At tsAfterDropColumn, there has been a schema change, but no new snapshot,
+ // so the snapshot as of tsAfterDropColumn is the same as that as of tsBeforeDropColumn.
+ Dataset<Row> resultDf4 = spark.read()
+ .format("iceberg")
+ .option(SparkReadOptions.AS_OF_TIMESTAMP, tsAfterDropColumn)
+ .load(loadLocation(tableIdentifier));
+ Assert.assertEquals("Records should match", originalRecords,
+ resultDf4.orderBy("id").collectAsList());
+ Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf4.schema());
+ }
+
+ @Test
+ public synchronized void testSnapshotReadAfterAddAndDropColumn() {
+ TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
+ Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+
+ List<Row> originalRecords = Lists.newArrayList(
+ RowFactory.create(1, "x"),
+ RowFactory.create(2, "y"),
+ RowFactory.create(3, "z"));
+
+ StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA);
+ Dataset<Row> inputDf = spark.createDataFrame(originalRecords, originalSparkSchema);
+ inputDf.select("id", "data").write()
+ .format("iceberg")
+ .mode(SaveMode.Append)
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+
+ Dataset<Row> resultDf = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier));
+ Assert.assertEquals("Records should match", originalRecords,
+ resultDf.orderBy("id").collectAsList());
+
+ Snapshot snapshot1 = table.currentSnapshot();
+
+ table.updateSchema().addColumn("category", Types.StringType.get()).commit();
+
+ List<Row> newRecords = Lists.newArrayList(
+ RowFactory.create(4, "xy", "B"),
+ RowFactory.create(5, "xyz", "C"));
+
+ StructType sparkSchemaAfterAddColumn = SparkSchemaUtil.convert(SCHEMA2);
+ Dataset<Row> inputDf2 = spark.createDataFrame(newRecords, sparkSchemaAfterAddColumn);
+ inputDf2.select("id", "data", "category").write()
+ .format("iceberg")
+ .mode(SaveMode.Append)
+ .save(loadLocation(tableIdentifier));
+
+ table.refresh();
+
+ List<Row> updatedRecords = Lists.newArrayList(
+ RowFactory.create(1, "x", null),
+ RowFactory.create(2, "y", null),
+ RowFactory.create(3, "z", null),
+ RowFactory.create(4, "xy", "B"),
+ RowFactory.create(5, "xyz", "C"));
+
+ Dataset<Row> resultDf2 = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier));
+ Assert.assertEquals("Records should match", updatedRecords,
+ resultDf2.orderBy("id").collectAsList());
+
+ table.updateSchema().deleteColumn("data").commit();
+
+ List<Row> recordsAfterDropColumn = Lists.newArrayList(
+ RowFactory.create(1, null),
+ RowFactory.create(2, null),
+ RowFactory.create(3, null),
+ RowFactory.create(4, "B"),
+ RowFactory.create(5, "C"));
+
+ Dataset<Row> resultDf3 = spark.read()
+ .format("iceberg")
+ .load(loadLocation(tableIdentifier));
+ Assert.assertEquals("Records should match", recordsAfterDropColumn,
+ resultDf3.orderBy("id").collectAsList());
+
+ Dataset<Row> resultDf4 = spark.read()
+ .format("iceberg")
+ .option(SparkReadOptions.SNAPSHOT_ID, snapshot1.snapshotId())
+ .load(loadLocation(tableIdentifier));
+ Assert.assertEquals("Records should match", originalRecords,
+ resultDf4.orderBy("id").collectAsList());
+ Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf4.schema());
+ }
+
+ @Test
public void testRemoveOrphanFilesActionSupport() throws InterruptedException {
TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
diff --git a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
index a51f9ee..76923d4 100644
--- a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
+++ b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
@@ -44,11 +44,14 @@ public class TestIcebergSourceHiveTables extends TestIcebergSourceTablesBase {
@After
public void dropTable() throws IOException {
- Table table = catalog.loadTable(currentIdentifier);
- Path tablePath = new Path(table.location());
- FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf());
- fs.delete(tablePath, true);
- catalog.dropTable(currentIdentifier, false);
+ if (currentIdentifier != null) {
+ Table table = catalog.loadTable(currentIdentifier);
+ Path tablePath = new Path(table.location());
+ FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf());
+ fs.delete(tablePath, true);
+ catalog.dropTable(currentIdentifier, false);
+ currentIdentifier = null;
+ }
}
@Override
diff --git a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 10b9d6f..fc6d1c5 100644
--- a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkTestBase;
@@ -1111,7 +1112,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
// check time travel
List<Row> actualAfterFirstCommit = spark.read()
.format("iceberg")
- .option("snapshot-id", String.valueOf(firstCommitId))
+ .option(SparkReadOptions.SNAPSHOT_ID, String.valueOf(firstCommitId))
.load(loadLocation(tableIdentifier, "partitions"))
.orderBy("partition.id")
.collectAsList();