You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2021/10/21 23:24:41 UTC

[iceberg] branch master updated: Spark 2.4: Use snapshot schema when time traveling (#1508)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new ffee32c  Spark 2.4: Use snapshot schema when time traveling (#1508)
ffee32c is described below

commit ffee32c48859eef2136f728438b1da613429fe7e
Author: Wing Yew Poon <wy...@cloudera.com>
AuthorDate: Thu Oct 21 16:24:29 2021 -0700

    Spark 2.4: Use snapshot schema when time traveling (#1508)
---
 .../java/org/apache/iceberg/BaseTableScan.java     |  28 +--
 .../java/org/apache/iceberg/DataTableScan.java     |  10 +
 .../java/org/apache/iceberg/util/DateTimeUtil.java |   7 +
 .../java/org/apache/iceberg/util/SnapshotUtil.java |  48 +++++
 .../apache/iceberg/spark/source/IcebergSource.java |   4 +-
 .../org/apache/iceberg/spark/source/Reader.java    | 148 ++++++-------
 .../spark/source/TestIcebergSourceHiveTables.java  |  13 +-
 .../spark/source/TestIcebergSourceTablesBase.java  | 233 ++++++++++++++++++++-
 .../spark/source/TestIcebergSourceHiveTables.java  |  13 +-
 .../spark/source/TestIcebergSourceTablesBase.java  |   3 +-
 10 files changed, 399 insertions(+), 108 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/BaseTableScan.java b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
index e890681..f942c51 100644
--- a/core/src/main/java/org/apache/iceberg/BaseTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseTableScan.java
@@ -19,10 +19,6 @@
 
 package org.apache.iceberg;
 
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneOffset;
-import java.time.format.DateTimeFormatter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
@@ -37,6 +33,8 @@ import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.TableScanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,8 +45,6 @@ import org.slf4j.LoggerFactory;
 abstract class BaseTableScan implements TableScan {
   private static final Logger LOG = LoggerFactory.getLogger(BaseTableScan.class);
 
-  private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
-
   private final TableOperations ops;
   private final Table table;
   private final Schema schema;
@@ -132,19 +128,7 @@ abstract class BaseTableScan implements TableScan {
     Preconditions.checkArgument(context.snapshotId() == null,
         "Cannot override snapshot, already set to id=%s", context.snapshotId());
 
-    Long lastSnapshotId = null;
-    for (HistoryEntry logEntry : ops.current().snapshotLog()) {
-      if (logEntry.timestampMillis() <= timestampMillis) {
-        lastSnapshotId = logEntry.snapshotId();
-      }
-    }
-
-    // the snapshot ID could be null if no entries were older than the requested time. in that case,
-    // there is no valid snapshot to read.
-    Preconditions.checkArgument(lastSnapshotId != null,
-        "Cannot find a snapshot older than %s", formatTimestampMillis(timestampMillis));
-
-    return useSnapshot(lastSnapshotId);
+    return useSnapshot(SnapshotUtil.snapshotIdAsOfTime(table(), timestampMillis));
   }
 
   @Override
@@ -199,7 +183,7 @@ abstract class BaseTableScan implements TableScan {
     Snapshot snapshot = snapshot();
     if (snapshot != null) {
       LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table,
-          snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()),
+          snapshot.snapshotId(), DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()),
           context.rowFilter());
 
       Listeners.notifyAll(
@@ -304,8 +288,4 @@ abstract class BaseTableScan implements TableScan {
 
     return schema;
   }
-
-  private static String formatTimestampMillis(long millis) {
-    return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC));
-  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index fdcd5ed..58a7c9b 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -23,6 +23,7 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.ThreadPools;
 
 public class DataTableScan extends BaseTableScan {
@@ -63,6 +64,15 @@ public class DataTableScan extends BaseTableScan {
   }
 
   @Override
+  public TableScan useSnapshot(long scanSnapshotId) {
+    // call method in superclass just for the side effect of argument validation;
+    // we do not use its return value
+    super.useSnapshot(scanSnapshotId);
+    Schema snapshotSchema = SnapshotUtil.schemaFor(table(), scanSnapshotId);
+    return newRefinedScan(tableOps(), table(), snapshotSchema, context().useSnapshotId(scanSnapshotId));
+  }
+
+  @Override
   protected TableScan newRefinedScan(TableOperations ops, Table table, Schema schema, TableScanContext context) {
     return new DataTableScan(ops, table, schema, context);
   }
diff --git a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
index 2dcaa9f..6b115b0 100644
--- a/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/DateTimeUtil.java
@@ -25,12 +25,15 @@ import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
 import java.time.temporal.ChronoUnit;
 
 public class DateTimeUtil {
   private DateTimeUtil() {
   }
 
+  private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
   public static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
   public static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
   public static final long MICROS_PER_MILLIS = 1000L;
@@ -81,4 +84,8 @@ public class DateTimeUtil {
   public static long microsFromTimestamptz(OffsetDateTime dateTime) {
     return ChronoUnit.MICROS.between(EPOCH, dateTime);
   }
+
+  public static String formatTimestampMillis(long millis) {
+    return DATE_FORMAT.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneOffset.UTC));
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
index beafcc2..b2efe29 100644
--- a/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java
@@ -22,6 +22,8 @@ package org.apache.iceberg.util;
 import java.util.List;
 import java.util.function.Function;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.HistoryEntry;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.exceptions.ValidationException;
@@ -144,4 +146,50 @@ public class SnapshotUtil {
     throw new IllegalStateException(
         String.format("Cannot find snapshot after %s: not an ancestor of table's current snapshot", snapshotId));
   }
+
+  /**
+   * Returns the ID of the most recent snapshot for the table as of the timestamp.
+   *
+   * @param table a {@link Table}
+   * @param timestampMillis the timestamp in millis since the Unix epoch
+   * @return the snapshot ID
+   * @throws IllegalArgumentException when no snapshot is found in the table
+   * older than the timestamp
+   */
+  public static long snapshotIdAsOfTime(Table table, long timestampMillis) {
+    Long snapshotId = null;
+    for (HistoryEntry logEntry : table.history()) {
+      if (logEntry.timestampMillis() <= timestampMillis) {
+        snapshotId = logEntry.snapshotId();
+      }
+    }
+
+    Preconditions.checkArgument(snapshotId != null,
+        "Cannot find a snapshot older than %s", DateTimeUtil.formatTimestampMillis(timestampMillis));
+    return snapshotId;
+  }
+
+  /**
+   * Returns the schema of the table for the specified snapshot.
+   *
+   * @param table a {@link Table}
+   * @param snapshotId the ID of the snapshot
+   * @return the schema
+   */
+  public static Schema schemaFor(Table table, long snapshotId) {
+    Snapshot snapshot = table.snapshot(snapshotId);
+    Preconditions.checkArgument(snapshot != null, "Cannot find snapshot with ID %s", snapshotId);
+    Integer schemaId = snapshot.schemaId();
+
+    // schemaId could be null, if snapshot was created before Iceberg added schema id to snapshot
+    if (schemaId != null) {
+      Schema schema = table.schemas().get(schemaId);
+      Preconditions.checkState(schema != null,
+          "Cannot find schema with schema id %s", schemaId);
+      return schema;
+    }
+
+    // TODO: recover the schema by reading previous metadata files
+    return table.schema();
+  }
 }
diff --git a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
index f0bc4ee..8976d66 100644
--- a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
+++ b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java
@@ -68,8 +68,8 @@ public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, D
 
     Reader reader = new Reader(lazySparkSession(), table, Boolean.parseBoolean(caseSensitive), options);
     if (readSchema != null) {
-      // convert() will fail if readSchema contains fields not in table.schema()
-      SparkSchemaUtil.convert(table.schema(), readSchema);
+      // convert() will fail if readSchema contains fields not in reader.snapshotSchema()
+      SparkSchemaUtil.convert(reader.snapshotSchema(), readSchema);
       reader.pruneColumns(readSchema);
     }
 
diff --git a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
index a8a2b2f..2ac570d 100644
--- a/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/v2.4/spark2/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -86,14 +86,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
   private final JavaSparkContext sparkContext;
   private final Table table;
   private final SparkReadConf readConf;
-  private final Long snapshotId;
-  private final Long startSnapshotId;
-  private final Long endSnapshotId;
-  private final Long asOfTimestamp;
-  private final Long splitSize;
-  private final Integer splitLookback;
-  private final Long splitOpenFileCost;
-  private final boolean caseSensitive;
+  private final TableScan baseScan;
   private StructType requestedSchema = null;
   private List<Expression> filterExpressions = null;
   private Filter[] pushedFilters = NO_FILTERS;
@@ -111,31 +104,9 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
     this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
     this.table = table;
     this.readConf = new SparkReadConf(spark, table, options.asMap());
-    this.snapshotId = readConf.snapshotId();
-    this.asOfTimestamp = readConf.asOfTimestamp();
-    if (snapshotId != null && asOfTimestamp != null) {
-      throw new IllegalArgumentException(
-          "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
-    }
-
-    this.startSnapshotId = readConf.startSnapshotId();
-    this.endSnapshotId = readConf.endSnapshotId();
-    if (snapshotId != null || asOfTimestamp != null) {
-      if (startSnapshotId != null || endSnapshotId != null) {
-        throw new IllegalArgumentException(
-            "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or " +
-                "as-of-timestamp is specified");
-      }
-    } else {
-      if (startSnapshotId == null && endSnapshotId != null) {
-        throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan");
-      }
-    }
 
-    // look for split behavior overrides in options
-    this.splitSize = options.get(SparkReadOptions.SPLIT_SIZE).map(Long::parseLong).orElse(null);
-    this.splitLookback = options.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null);
-    this.splitOpenFileCost = options.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null);
+    this.baseScan = configureBaseScan(caseSensitive, options);
+    this.schema = baseScan.schema();
 
     if (table.io() instanceof HadoopFileIO) {
       String fsscheme = "no_exist";
@@ -157,18 +128,84 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
       this.localityPreferred = false;
     }
 
-    this.schema = table.schema();
-    this.caseSensitive = caseSensitive;
     this.readTimestampWithoutZone = readConf.handleTimestampWithoutZone();
   }
 
+  private void validateOptions(
+      Long snapshotId, Long asOfTimestamp, Long startSnapshotId, Long endSnapshotId) {
+    if (snapshotId != null && asOfTimestamp != null) {
+      throw new IllegalArgumentException(
+          "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot");
+    }
+
+    if ((snapshotId != null || asOfTimestamp != null) &&
+        (startSnapshotId != null || endSnapshotId != null)) {
+      throw new IllegalArgumentException(
+          "Cannot specify start-snapshot-id and end-snapshot-id to do incremental scan when either snapshot-id or " +
+          "as-of-timestamp is specified");
+    }
+
+    if (startSnapshotId == null && endSnapshotId != null) {
+      throw new IllegalArgumentException("Cannot only specify option end-snapshot-id to do incremental scan");
+    }
+  }
+
+  private TableScan configureBaseScan(boolean caseSensitive, DataSourceOptions options) {
+    Long snapshotId = readConf.snapshotId();
+    Long asOfTimestamp = readConf.asOfTimestamp();
+    Long startSnapshotId = readConf.startSnapshotId();
+    Long endSnapshotId = readConf.endSnapshotId();
+    validateOptions(snapshotId, asOfTimestamp, startSnapshotId, endSnapshotId);
+
+    TableScan scan = table.newScan().caseSensitive(caseSensitive);
+
+    if (snapshotId != null) {
+      scan = scan.useSnapshot(snapshotId);
+    }
+
+    if (asOfTimestamp != null) {
+      scan = scan.asOfTime(asOfTimestamp);
+    }
+
+    if (startSnapshotId != null) {
+      if (endSnapshotId != null) {
+        scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
+      } else {
+        scan = scan.appendsAfter(startSnapshotId);
+      }
+    }
+
+    // look for split behavior overrides in options
+    Long splitSize = options.get(SparkReadOptions.SPLIT_SIZE).map(Long::parseLong).orElse(null);
+    if (splitSize != null) {
+      scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
+    }
+
+    Integer splitLookback = options.get(SparkReadOptions.LOOKBACK).map(Integer::parseInt).orElse(null);
+    if (splitLookback != null) {
+      scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString());
+    }
+
+    Long splitOpenFileCost = options.get(SparkReadOptions.FILE_OPEN_COST).map(Long::parseLong).orElse(null);
+    if (splitOpenFileCost != null) {
+      scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString());
+    }
+
+    return scan;
+  }
+
+  protected Schema snapshotSchema() {
+    return baseScan.schema();
+  }
+
   private Schema lazySchema() {
     if (schema == null) {
       if (requestedSchema != null) {
         // the projection should include all columns that will be returned, including those only used in filters
-        this.schema = SparkSchemaUtil.prune(table.schema(), requestedSchema, filterExpression(), caseSensitive);
+        this.schema = SparkSchemaUtil.prune(
+            baseScan.schema(), requestedSchema, filterExpression(), baseScan.isCaseSensitive());
       } else {
-        this.schema = table.schema();
+        this.schema = baseScan.schema();
       }
     }
     return schema;
@@ -211,6 +248,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<CombinedScanTask> scanTasks = tasks();
+    boolean caseSensitive = baseScan.isCaseSensitive();
     InputPartition<ColumnarBatch>[] readTasks = new InputPartition[scanTasks.size()];
 
     Tasks.range(readTasks.length)
@@ -235,6 +273,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
     Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
 
     List<CombinedScanTask> scanTasks = tasks();
+    boolean caseSensitive = baseScan.isCaseSensitive();
     InputPartition<InternalRow>[] readTasks = new InputPartition[scanTasks.size()];
 
     Tasks.range(readTasks.length)
@@ -378,38 +417,7 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
 
   private List<CombinedScanTask> tasks() {
     if (tasks == null) {
-      TableScan scan = table
-          .newScan()
-          .caseSensitive(caseSensitive)
-          .project(lazySchema());
-
-      if (snapshotId != null) {
-        scan = scan.useSnapshot(snapshotId);
-      }
-
-      if (asOfTimestamp != null) {
-        scan = scan.asOfTime(asOfTimestamp);
-      }
-
-      if (startSnapshotId != null) {
-        if (endSnapshotId != null) {
-          scan = scan.appendsBetween(startSnapshotId, endSnapshotId);
-        } else {
-          scan = scan.appendsAfter(startSnapshotId);
-        }
-      }
-
-      if (splitSize != null) {
-        scan = scan.option(TableProperties.SPLIT_SIZE, splitSize.toString());
-      }
-
-      if (splitLookback != null) {
-        scan = scan.option(TableProperties.SPLIT_LOOKBACK, splitLookback.toString());
-      }
-
-      if (splitOpenFileCost != null) {
-        scan = scan.option(TableProperties.SPLIT_OPEN_FILE_COST, splitOpenFileCost.toString());
-      }
+      TableScan scan = baseScan.project(lazySchema());
 
       if (filterExpressions != null) {
         for (Expression filter : filterExpressions) {
@@ -430,8 +438,8 @@ class Reader implements DataSourceReader, SupportsScanColumnarBatch, SupportsPus
   @Override
   public String toString() {
     return String.format(
-        "IcebergScan(table=%s, type=%s, filters=%s, caseSensitive=%s, batchedReads=%s)",
-        table, lazySchema().asStruct(), filterExpressions, caseSensitive, enableBatchRead());
+        "IcebergScan(table=%s, type=%s, filters=%s, batchedReads=%s)",
+        table, lazySchema().asStruct(), filterExpressions, enableBatchRead());
   }
 
   private static class ReadTask<T> implements Serializable, InputPartition<T> {
diff --git a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
index a51f9ee..76923d4 100644
--- a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
+++ b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
@@ -44,11 +44,14 @@ public class TestIcebergSourceHiveTables extends TestIcebergSourceTablesBase {
 
   @After
   public void dropTable() throws IOException {
-    Table table = catalog.loadTable(currentIdentifier);
-    Path tablePath = new Path(table.location());
-    FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf());
-    fs.delete(tablePath, true);
-    catalog.dropTable(currentIdentifier, false);
+    if (currentIdentifier != null) {
+      Table table = catalog.loadTable(currentIdentifier);
+      Path tablePath = new Path(table.location());
+      FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf());
+      fs.delete(tablePath, true);
+      catalog.dropTable(currentIdentifier, false);
+      currentIdentifier = null;
+    }
   }
 
   @Override
diff --git a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 10b9d6f..93a3bf1 100644
--- a/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v2.4/spark2/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkTestBase;
@@ -52,7 +53,9 @@ import org.apache.spark.SparkException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.SaveMode;
+import org.apache.spark.sql.types.StructType;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -68,6 +71,17 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
       optional(2, "data", Types.StringType.get())
   );
 
+  private static final Schema SCHEMA2 = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(2, "data", Types.StringType.get()),
+      optional(3, "category", Types.StringType.get())
+  );
+
+  private static final Schema SCHEMA3 = new Schema(
+      optional(1, "id", Types.IntegerType.get()),
+      optional(3, "category", Types.StringType.get())
+  );
+
   private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).identity("id").build();
 
   @Rule
@@ -1111,7 +1125,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
     // check time travel
     List<Row> actualAfterFirstCommit = spark.read()
         .format("iceberg")
-        .option("snapshot-id", String.valueOf(firstCommitId))
+        .option(SparkReadOptions.SNAPSHOT_ID, String.valueOf(firstCommitId))
         .load(loadLocation(tableIdentifier, "partitions"))
         .orderBy("partition.id")
         .collectAsList();
@@ -1140,6 +1154,223 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
   }
 
   @Test
+  public synchronized void testSnapshotReadAfterAddColumn() {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+
+    List<Row> originalRecords = Lists.newArrayList(
+        RowFactory.create(1, "x"),
+        RowFactory.create(2, "y"),
+        RowFactory.create(3, "z"));
+
+    StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA);
+    Dataset<Row> inputDf = spark.createDataFrame(originalRecords, originalSparkSchema);
+    inputDf.select("id", "data").write()
+        .format("iceberg")
+        .mode(SaveMode.Append)
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+
+    Dataset<Row> resultDf = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier));
+    Assert.assertEquals("Records should match", originalRecords,
+        resultDf.orderBy("id").collectAsList());
+
+    Snapshot snapshot1 = table.currentSnapshot();
+
+    table.updateSchema().addColumn("category", Types.StringType.get()).commit();
+
+    List<Row> newRecords = Lists.newArrayList(
+        RowFactory.create(4, "xy", "B"),
+        RowFactory.create(5, "xyz", "C"));
+
+    StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA2);
+    Dataset<Row> inputDf2 = spark.createDataFrame(newRecords, newSparkSchema);
+    inputDf2.select("id", "data", "category").write()
+        .format("iceberg")
+        .mode(SaveMode.Append)
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+
+    List<Row> updatedRecords = Lists.newArrayList(
+        RowFactory.create(1, "x", null),
+        RowFactory.create(2, "y", null),
+        RowFactory.create(3, "z", null),
+        RowFactory.create(4, "xy", "B"),
+        RowFactory.create(5, "xyz", "C"));
+
+    Dataset<Row> resultDf2 = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier));
+    Assert.assertEquals("Records should match", updatedRecords,
+        resultDf2.orderBy("id").collectAsList());
+
+    Dataset<Row> resultDf3 = spark.read()
+        .format("iceberg")
+        .option(SparkReadOptions.SNAPSHOT_ID, snapshot1.snapshotId())
+        .load(loadLocation(tableIdentifier));
+    Assert.assertEquals("Records should match", originalRecords,
+        resultDf3.orderBy("id").collectAsList());
+    Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf3.schema());
+  }
+
+  @Test
+  public synchronized void testSnapshotReadAfterDropColumn() {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
+    Table table = createTable(tableIdentifier, SCHEMA2, PartitionSpec.unpartitioned());
+
+    List<Row> originalRecords = Lists.newArrayList(
+        RowFactory.create(1, "x", "A"),
+        RowFactory.create(2, "y", "A"),
+        RowFactory.create(3, "z", "B"));
+
+    StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA2);
+    Dataset<Row> inputDf = spark.createDataFrame(originalRecords, originalSparkSchema);
+    inputDf.select("id", "data", "category").write()
+        .format("iceberg")
+        .mode(SaveMode.Append)
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+
+    Dataset<Row> resultDf = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier));
+    Assert.assertEquals("Records should match", originalRecords,
+        resultDf.orderBy("id").collectAsList());
+
+    long tsBeforeDropColumn = waitUntilAfter(System.currentTimeMillis());
+    table.updateSchema().deleteColumn("data").commit();
+    long tsAfterDropColumn = waitUntilAfter(System.currentTimeMillis());
+
+    List<Row> newRecords = Lists.newArrayList(
+        RowFactory.create(4, "B"),
+        RowFactory.create(5, "C"));
+
+    StructType newSparkSchema = SparkSchemaUtil.convert(SCHEMA3);
+    Dataset<Row> inputDf2 = spark.createDataFrame(newRecords, newSparkSchema);
+    inputDf2.select("id", "category").write()
+        .format("iceberg")
+        .mode(SaveMode.Append)
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+
+    List<Row> updatedRecords = Lists.newArrayList(
+        RowFactory.create(1, "A"),
+        RowFactory.create(2, "A"),
+        RowFactory.create(3, "B"),
+        RowFactory.create(4, "B"),
+        RowFactory.create(5, "C"));
+
+    Dataset<Row> resultDf2 = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier));
+    Assert.assertEquals("Records should match", updatedRecords,
+        resultDf2.orderBy("id").collectAsList());
+
+    Dataset<Row> resultDf3 = spark.read()
+        .format("iceberg")
+        .option(SparkReadOptions.AS_OF_TIMESTAMP, tsBeforeDropColumn)
+        .load(loadLocation(tableIdentifier));
+    Assert.assertEquals("Records should match", originalRecords,
+        resultDf3.orderBy("id").collectAsList());
+    Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf3.schema());
+
+    // At tsAfterDropColumn, there has been a schema change, but no new snapshot,
+    // so the snapshot as of tsAfterDropColumn is the same as that as of tsBeforeDropColumn.
+    Dataset<Row> resultDf4 = spark.read()
+        .format("iceberg")
+        .option(SparkReadOptions.AS_OF_TIMESTAMP, tsAfterDropColumn)
+        .load(loadLocation(tableIdentifier));
+    Assert.assertEquals("Records should match", originalRecords,
+        resultDf4.orderBy("id").collectAsList());
+    Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf4.schema());
+  }
+
+  @Test
+  public synchronized void testSnapshotReadAfterAddAndDropColumn() {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+
+    List<Row> originalRecords = Lists.newArrayList(
+        RowFactory.create(1, "x"),
+        RowFactory.create(2, "y"),
+        RowFactory.create(3, "z"));
+
+    StructType originalSparkSchema = SparkSchemaUtil.convert(SCHEMA);
+    Dataset<Row> inputDf = spark.createDataFrame(originalRecords, originalSparkSchema);
+    inputDf.select("id", "data").write()
+        .format("iceberg")
+        .mode(SaveMode.Append)
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+
+    Dataset<Row> resultDf = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier));
+    Assert.assertEquals("Records should match", originalRecords,
+        resultDf.orderBy("id").collectAsList());
+
+    Snapshot snapshot1 = table.currentSnapshot();
+
+    table.updateSchema().addColumn("category", Types.StringType.get()).commit();
+
+    List<Row> newRecords = Lists.newArrayList(
+        RowFactory.create(4, "xy", "B"),
+        RowFactory.create(5, "xyz", "C"));
+
+    StructType sparkSchemaAfterAddColumn = SparkSchemaUtil.convert(SCHEMA2);
+    Dataset<Row> inputDf2 = spark.createDataFrame(newRecords, sparkSchemaAfterAddColumn);
+    inputDf2.select("id", "data", "category").write()
+        .format("iceberg")
+        .mode(SaveMode.Append)
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+
+    List<Row> updatedRecords = Lists.newArrayList(
+        RowFactory.create(1, "x", null),
+        RowFactory.create(2, "y", null),
+        RowFactory.create(3, "z", null),
+        RowFactory.create(4, "xy", "B"),
+        RowFactory.create(5, "xyz", "C"));
+
+    Dataset<Row> resultDf2 = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier));
+    Assert.assertEquals("Records should match", updatedRecords,
+        resultDf2.orderBy("id").collectAsList());
+
+    table.updateSchema().deleteColumn("data").commit();
+
+    List<Row> recordsAfterDropColumn = Lists.newArrayList(
+        RowFactory.create(1, null),
+        RowFactory.create(2, null),
+        RowFactory.create(3, null),
+        RowFactory.create(4, "B"),
+        RowFactory.create(5, "C"));
+
+    Dataset<Row> resultDf3 = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier));
+    Assert.assertEquals("Records should match", recordsAfterDropColumn,
+        resultDf3.orderBy("id").collectAsList());
+
+    Dataset<Row> resultDf4 = spark.read()
+        .format("iceberg")
+        .option(SparkReadOptions.SNAPSHOT_ID, snapshot1.snapshotId())
+        .load(loadLocation(tableIdentifier));
+    Assert.assertEquals("Records should match", originalRecords,
+        resultDf4.orderBy("id").collectAsList());
+    Assert.assertEquals("Schemas should match", originalSparkSchema, resultDf4.schema());
+  }
+
+  @Test
   public void testRemoveOrphanFilesActionSupport() throws InterruptedException {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "table");
     Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
diff --git a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
index a51f9ee..76923d4 100644
--- a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
+++ b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
@@ -44,11 +44,14 @@ public class TestIcebergSourceHiveTables extends TestIcebergSourceTablesBase {
 
   @After
   public void dropTable() throws IOException {
-    Table table = catalog.loadTable(currentIdentifier);
-    Path tablePath = new Path(table.location());
-    FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf());
-    fs.delete(tablePath, true);
-    catalog.dropTable(currentIdentifier, false);
+    if (currentIdentifier != null) {
+      Table table = catalog.loadTable(currentIdentifier);
+      Path tablePath = new Path(table.location());
+      FileSystem fs = tablePath.getFileSystem(spark.sessionState().newHadoopConf());
+      fs.delete(tablePath, true);
+      catalog.dropTable(currentIdentifier, false);
+      currentIdentifier = null;
+    }
   }
 
   @Override
diff --git a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 10b9d6f..fc6d1c5 100644
--- a/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/v3.0/spark3/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -43,6 +43,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkTestBase;
@@ -1111,7 +1112,7 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
     // check time travel
     List<Row> actualAfterFirstCommit = spark.read()
         .format("iceberg")
-        .option("snapshot-id", String.valueOf(firstCommitId))
+        .option(SparkReadOptions.SNAPSHOT_ID, String.valueOf(firstCommitId))
         .load(loadLocation(tableIdentifier, "partitions"))
         .orderBy("partition.id")
         .collectAsList();