You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2021/09/27 18:41:33 UTC

[iceberg] branch master updated: Spark: Support spec ID and partition metadata columns (#2984)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cf96f0  Spark: Support spec ID and partition metadata columns (#2984)
7cf96f0 is described below

commit 7cf96f096edeceac13916676c273954fdd435513
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Sep 27 11:41:20 2021 -0700

    Spark: Support spec ID and partition metadata columns (#2984)
---
 .../org/apache/iceberg/util/StructProjection.java  |  33 +++-
 .../java/org/apache/iceberg/MetadataColumns.java   |  35 +++-
 .../org/apache/iceberg/util/PartitionUtil.java     |  30 +++-
 .../iceberg/spark/source/BaseDataReader.java       |  47 ++++-
 .../iceberg/spark/source/BatchDataReader.java      |   5 +-
 .../spark/source/EqualityDeleteRowReader.java      |   3 +-
 .../apache/iceberg/spark/source/RowDataReader.java |   5 +-
 .../spark/source/TestSparkBaseDataReader.java      |  23 +--
 .../iceberg/spark/source/SparkScanBuilder.java     |   2 +-
 .../iceberg/spark/source/SparkTestTable.java       |  60 +++++++
 .../iceberg/spark/source/TestSparkCatalog.java     |  12 +-
 .../spark/source/TestSparkMetadataColumns.java     | 194 +++++++++++++++++++++
 12 files changed, 407 insertions(+), 42 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java
index be05b0f..c18f69f 100644
--- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java
+++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java
@@ -58,12 +58,30 @@ public class StructProjection implements StructLike {
     return new StructProjection(dataSchema.asStruct(), projectedSchema.asStruct());
   }
 
+  /**
+   * Creates a projecting wrapper for {@link StructLike} rows.
+   * <p>
+   * This projection allows missing fields and does not work with repeated types like lists and maps.
+   *
+   * @param structType type of rows wrapped by this projection
+   * @param projectedStructType result type of the projected rows
+   * @return a wrapper to project rows
+   */
+  public static StructProjection createAllowMissing(StructType structType, StructType projectedStructType) {
+    return new StructProjection(structType, projectedStructType, true);
+  }
+
   private final StructType type;
   private final int[] positionMap;
   private final StructProjection[] nestedProjections;
   private StructLike struct;
 
   private StructProjection(StructType structType, StructType projection) {
+    this(structType, projection, false);
+  }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  private StructProjection(StructType structType, StructType projection, boolean allowMissing) {
     this.type = projection;
     this.positionMap = new int[projection.fields().size()];
     this.nestedProjections = new StructProjection[projection.fields().size()];
@@ -116,7 +134,10 @@ public class StructProjection implements StructLike {
         }
       }
 
-      if (!found) {
+      if (!found && projectedField.isOptional() && allowMissing) {
+        positionMap[pos] = -1;
+        nestedProjections[pos] = null;
+      } else if (!found) {
         throw new IllegalArgumentException(String.format("Cannot find field %s in %s", projectedField, structType));
       }
     }
@@ -134,11 +155,17 @@ public class StructProjection implements StructLike {
 
   @Override
   public <T> T get(int pos, Class<T> javaClass) {
+    int structPos = positionMap[pos];
+
     if (nestedProjections[pos] != null) {
-      return javaClass.cast(nestedProjections[pos].wrap(struct.get(positionMap[pos], StructLike.class)));
+      return javaClass.cast(nestedProjections[pos].wrap(struct.get(structPos, StructLike.class)));
     }
 
-    return struct.get(positionMap[pos], javaClass);
+    if (structPos != -1) {
+      return struct.get(structPos, javaClass);
+    } else {
+      return null;
+    }
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/MetadataColumns.java b/core/src/main/java/org/apache/iceberg/MetadataColumns.java
index e1cf096..af7b655 100644
--- a/core/src/main/java/org/apache/iceberg/MetadataColumns.java
+++ b/core/src/main/java/org/apache/iceberg/MetadataColumns.java
@@ -38,6 +38,12 @@ public class MetadataColumns {
       Integer.MAX_VALUE - 2, "_pos", Types.LongType.get(), "Ordinal position of a row in the source data file");
   public static final NestedField IS_DELETED = NestedField.required(
       Integer.MAX_VALUE - 3, "_deleted", Types.BooleanType.get(), "Whether the row has been deleted");
+  public static final NestedField SPEC_ID = NestedField.required(
+      Integer.MAX_VALUE - 4, "_spec_id", Types.IntegerType.get(), "Spec ID used to track the file containing a row");
+  // the partition column type is not static and depends on all specs in the table
+  public static final int PARTITION_COLUMN_ID = Integer.MAX_VALUE - 5;
+  public static final String PARTITION_COLUMN_NAME = "_partition";
+  public static final String PARTITION_COLUMN_DOC = "Partition to which a row belongs to";
 
   // IDs Integer.MAX_VALUE - (101-200) are used for reserved columns
   public static final NestedField DELETE_FILE_PATH = NestedField.required(
@@ -51,24 +57,39 @@ public class MetadataColumns {
   private static final Map<String, NestedField> META_COLUMNS = ImmutableMap.of(
       FILE_PATH.name(), FILE_PATH,
       ROW_POSITION.name(), ROW_POSITION,
-      IS_DELETED.name(), IS_DELETED);
+      IS_DELETED.name(), IS_DELETED,
+      SPEC_ID.name(), SPEC_ID
+  );
 
-  private static final Set<Integer> META_IDS = META_COLUMNS.values().stream().map(NestedField::fieldId)
-      .collect(ImmutableSet.toImmutableSet());
+  private static final Set<Integer> META_IDS = ImmutableSet.of(
+      FILE_PATH.fieldId(),
+      ROW_POSITION.fieldId(),
+      IS_DELETED.fieldId(),
+      SPEC_ID.fieldId(),
+      PARTITION_COLUMN_ID
+  );
 
   public static Set<Integer> metadataFieldIds() {
     return META_IDS;
   }
 
-  public static NestedField get(String name) {
-    return META_COLUMNS.get(name);
+  public static NestedField metadataColumn(Table table, String name) {
+    if (name.equals(PARTITION_COLUMN_NAME)) {
+      return Types.NestedField.optional(
+          PARTITION_COLUMN_ID,
+          PARTITION_COLUMN_NAME,
+          Partitioning.partitionType(table),
+          PARTITION_COLUMN_DOC);
+    } else {
+      return META_COLUMNS.get(name);
+    }
   }
 
   public static boolean isMetadataColumn(String name) {
-    return META_COLUMNS.containsKey(name);
+    return name.equals(PARTITION_COLUMN_NAME) || META_COLUMNS.containsKey(name);
   }
 
   public static boolean nonMetadataColumn(String name) {
-    return !META_COLUMNS.containsKey(name);
+    return !isMetadataColumn(name);
   }
 }
diff --git a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
index 929f77a..02c8b30 100644
--- a/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
@@ -36,10 +36,15 @@ public class PartitionUtil {
   }
 
   public static Map<Integer, ?> constantsMap(FileScanTask task) {
-    return constantsMap(task, (type, constant) -> constant);
+    return constantsMap(task, null, (type, constant) -> constant);
   }
 
   public static Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, Object, Object> convertConstant) {
+    return constantsMap(task, null, convertConstant);
+  }
+
+  public static Map<Integer, ?> constantsMap(FileScanTask task, Types.StructType partitionType,
+                                             BiFunction<Type, Object, Object> convertConstant) {
     PartitionSpec spec = task.spec();
     StructLike partitionData = task.file().partition();
 
@@ -51,6 +56,22 @@ public class PartitionUtil {
         MetadataColumns.FILE_PATH.fieldId(),
         convertConstant.apply(Types.StringType.get(), task.file().path()));
 
+    // add _spec_id
+    idToConstant.put(
+        MetadataColumns.SPEC_ID.fieldId(),
+        convertConstant.apply(Types.IntegerType.get(), task.file().specId()));
+
+    // add _partition
+    if (partitionType != null) {
+      if (partitionType.fields().size() > 0) {
+        StructLike coercedPartition = coercePartition(partitionType, spec, partitionData);
+        idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, convertConstant.apply(partitionType, coercedPartition));
+      } else {
+        // use null as some query engines may not be able to handle empty structs
+        idToConstant.put(MetadataColumns.PARTITION_COLUMN_ID, null);
+      }
+    }
+
     List<Types.NestedField> partitionFields = spec.partitionType().fields();
     List<PartitionField> fields = spec.fields();
     for (int pos = 0; pos < fields.size(); pos += 1) {
@@ -63,4 +84,11 @@ public class PartitionUtil {
 
     return idToConstant;
   }
+
+  // adapts the provided partition data to match the table partition type
+  private static StructLike coercePartition(Types.StructType partitionType, PartitionSpec spec, StructLike partition) {
+    StructProjection projection = StructProjection.createAllowMissing(spec.partitionType(), partitionType);
+    projection.wrap(partition);
+    return projection;
+  }
 }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
index c8b33dd..b58745c 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BaseDataReader.java
@@ -24,24 +24,32 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.util.Utf8;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.encryption.EncryptedFiles;
 import org.apache.iceberg.encryption.EncryptedInputFile;
-import org.apache.iceberg.encryption.EncryptionManager;
 import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
 import org.apache.iceberg.util.ByteBuffers;
+import org.apache.iceberg.util.PartitionUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.types.UTF8String;
 import org.slf4j.Logger;
@@ -55,6 +63,7 @@ import org.slf4j.LoggerFactory;
 abstract class BaseDataReader<T> implements Closeable {
   private static final Logger LOG = LoggerFactory.getLogger(BaseDataReader.class);
 
+  private final Table table;
   private final Iterator<FileScanTask> tasks;
   private final Map<String, InputFile> inputFiles;
 
@@ -62,17 +71,18 @@ abstract class BaseDataReader<T> implements Closeable {
   private T current = null;
   private FileScanTask currentTask = null;
 
-  BaseDataReader(CombinedScanTask task, FileIO io, EncryptionManager encryptionManager) {
+  BaseDataReader(Table table, CombinedScanTask task) {
+    this.table = table;
     this.tasks = task.files().iterator();
     Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
     task.files().stream()
         .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
         .forEach(file -> keyMetadata.put(file.path().toString(), file.keyMetadata()));
     Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
-        .map(entry -> EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), entry.getValue()));
+        .map(entry -> EncryptedFiles.encryptedInput(table.io().newInputFile(entry.getKey()), entry.getValue()));
 
     // decrypt with the batch call to avoid multiple RPCs to a key server, if possible
-    Iterable<InputFile> decryptedFiles = encryptionManager.decrypt(encrypted::iterator);
+    Iterable<InputFile> decryptedFiles = table.encryption().decrypt(encrypted::iterator);
 
     Map<String, InputFile> files = Maps.newHashMapWithExpectedSize(task.files().size());
     decryptedFiles.forEach(decrypted -> files.putIfAbsent(decrypted.location(), decrypted));
@@ -132,6 +142,15 @@ abstract class BaseDataReader<T> implements Closeable {
     return inputFiles.get(location);
   }
 
+  protected Map<Integer, ?> constantsMap(FileScanTask task, Schema readSchema) {
+    if (readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != null) {
+      StructType partitionType = Partitioning.partitionType(table);
+      return PartitionUtil.constantsMap(task, partitionType, BaseDataReader::convertConstant);
+    } else {
+      return PartitionUtil.constantsMap(task, BaseDataReader::convertConstant);
+    }
+  }
+
   protected static Object convertConstant(Type type, Object value) {
     if (value == null) {
       return null;
@@ -155,6 +174,24 @@ abstract class BaseDataReader<T> implements Closeable {
         return ByteBuffers.toByteArray((ByteBuffer) value);
       case BINARY:
         return ByteBuffers.toByteArray((ByteBuffer) value);
+      case STRUCT:
+        StructType structType = (StructType) type;
+
+        if (structType.fields().isEmpty()) {
+          return new GenericInternalRow();
+        }
+
+        List<NestedField> fields = structType.fields();
+        Object[] values = new Object[fields.size()];
+        StructLike struct = (StructLike) value;
+
+        for (int index = 0; index < fields.size(); index++) {
+          NestedField field = fields.get(index);
+          Type fieldType = field.type();
+          values[index] = convertConstant(fieldType, struct.get(index, fieldType.typeId().javaClass()));
+        }
+
+        return new GenericInternalRow(values);
       default:
     }
     return value;
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
index 8cfe46b..e4bd3ce 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java
@@ -41,7 +41,6 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders;
 import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PartitionUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.vectorized.ColumnarBatch;
 
@@ -52,7 +51,7 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
   private final int batchSize;
 
   BatchDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive, int size) {
-    super(task, table.io(), table.encryption());
+    super(table, task);
     this.expectedSchema = expectedSchema;
     this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
     this.caseSensitive = caseSensitive;
@@ -66,7 +65,7 @@ class BatchDataReader extends BaseDataReader<ColumnarBatch> {
     // update the current file for Spark's filename() function
     InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());
 
-    Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, BatchDataReader::convertConstant);
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
 
     CloseableIterable<ColumnarBatch> iter;
     InputFile location = getInputFile(task);
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
index d4328ad..ce2226f 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java
@@ -26,7 +26,6 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.util.PartitionUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
 
@@ -44,7 +43,7 @@ public class EqualityDeleteRowReader extends RowDataReader {
 
     // schema or rows returned by readers
     Schema requiredSchema = matches.requiredSchema();
-    Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
     DataFile file = task.file();
 
     // update the current file for Spark's filename() function
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 391d4a0..8770e17 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -44,7 +44,6 @@ import org.apache.iceberg.spark.data.SparkAvroReader;
 import org.apache.iceberg.spark.data.SparkOrcReader;
 import org.apache.iceberg.spark.data.SparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PartitionUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
 
@@ -56,7 +55,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
   private final boolean caseSensitive;
 
   RowDataReader(CombinedScanTask task, Table table, Schema expectedSchema, boolean caseSensitive) {
-    super(task, table.io(), table.encryption());
+    super(table, task);
     this.tableSchema = table.schema();
     this.expectedSchema = expectedSchema;
     this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
@@ -69,7 +68,7 @@ class RowDataReader extends BaseDataReader<InternalRow> {
 
     // schema or rows returned by readers
     Schema requiredSchema = deletes.requiredSchema();
-    Map<Integer, ?> idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant);
+    Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema);
     DataFile file = task.file();
 
     // update the current file for Spark's filename() function
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
index 51b47cb..8bae666 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkBaseDataReader.java
@@ -30,7 +30,6 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.StreamSupport;
 import org.apache.avro.generic.GenericData;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.BaseCombinedScanTask;
 import org.apache.iceberg.DataFile;
@@ -39,8 +38,6 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.encryption.PlaintextEncryptionManager;
-import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileAppender;
 import org.apache.iceberg.parquet.Parquet;
@@ -59,7 +56,7 @@ public abstract class TestSparkBaseDataReader {
   @Rule
   public TemporaryFolder temp = new TemporaryFolder();
 
-  private static final Configuration CONFD = new Configuration();
+  private Table table;
 
   // Simulates the closeable iterator of data to be read
   private static class CloseableIntegerRange implements CloseableIterator<Integer> {
@@ -92,10 +89,8 @@ public abstract class TestSparkBaseDataReader {
   private static class ClosureTrackingReader extends BaseDataReader<Integer> {
     private Map<String, CloseableIntegerRange> tracker = new HashMap<>();
 
-    ClosureTrackingReader(List<FileScanTask> tasks) {
-      super(new BaseCombinedScanTask(tasks),
-          new HadoopFileIO(CONFD),
-          new PlaintextEncryptionManager());
+    ClosureTrackingReader(Table table, List<FileScanTask> tasks) {
+      super(table, new BaseCombinedScanTask(tasks));
     }
 
     @Override
@@ -124,7 +119,7 @@ public abstract class TestSparkBaseDataReader {
     Integer recordPerTask = 10;
     List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
 
-    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+    ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
 
     int countRecords = 0;
     while (reader.next()) {
@@ -151,7 +146,7 @@ public abstract class TestSparkBaseDataReader {
     FileScanTask firstTask = tasks.get(0);
     FileScanTask secondTask = tasks.get(1);
 
-    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+    ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
 
     // Total of 2 elements
     Assert.assertTrue(reader.next());
@@ -175,7 +170,7 @@ public abstract class TestSparkBaseDataReader {
     Integer recordPerTask = 10;
     List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
 
-    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+    ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
 
     reader.close();
 
@@ -191,7 +186,7 @@ public abstract class TestSparkBaseDataReader {
     Integer recordPerTask = 10;
     List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
 
-    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+    ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
 
     Integer halfDataSize = (totalTasks * recordPerTask) / 2;
     for (int i = 0; i < halfDataSize; i++) {
@@ -217,7 +212,7 @@ public abstract class TestSparkBaseDataReader {
     Integer recordPerTask = 10;
     List<FileScanTask> tasks = createFileScanTasks(totalTasks, recordPerTask);
 
-    ClosureTrackingReader reader = new ClosureTrackingReader(tasks);
+    ClosureTrackingReader reader = new ClosureTrackingReader(table, tasks);
 
     // Total 100 elements, only 5 iterators have been created
     for (int i = 0; i < 45; i++) {
@@ -250,7 +245,7 @@ public abstract class TestSparkBaseDataReader {
     );
 
     try {
-      Table table = TestTables.create(location, desc, schema, PartitionSpec.unpartitioned());
+      this.table = TestTables.create(location, desc, schema, PartitionSpec.unpartitioned());
       // Important: use the table's schema for the rest of the test
       // When tables are created, the column ids are reassigned.
       Schema tableSchema = table.schema();
diff --git a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 633a171..483dbcf 100644
--- a/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ b/spark3/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -149,7 +149,7 @@ public class SparkScanBuilder implements ScanBuilder, SupportsPushDownFilters, S
     // metadata columns
     List<Types.NestedField> fields = metaColumns.stream()
         .distinct()
-        .map(MetadataColumns::get)
+        .map(name -> MetadataColumns.metadataColumn(table, name))
         .collect(Collectors.toList());
     Schema meta = new Schema(fields);
 
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java b/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java
new file mode 100644
index 0000000..afb1136
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/SparkTestTable.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+// TODO: remove this class once we compile against Spark 3.2
+public class SparkTestTable extends SparkTable {
+
+  private final String[] metadataColumnNames;
+
+  public SparkTestTable(Table icebergTable, String[] metadataColumnNames, boolean refreshEagerly) {
+    super(icebergTable, refreshEagerly);
+    this.metadataColumnNames = metadataColumnNames;
+  }
+
+  @Override
+  public StructType schema() {
+    StructType schema = super.schema();
+    if (metadataColumnNames != null) {
+      for (String columnName : metadataColumnNames) {
+        Types.NestedField metadataColumn = MetadataColumns.metadataColumn(table(), columnName);
+        schema = schema.add(columnName, SparkSchemaUtil.convert(metadataColumn.type()));
+      }
+    }
+    return schema;
+  }
+
+  @Override
+  public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+    SparkScanBuilder scanBuilder = (SparkScanBuilder) super.newScanBuilder(options);
+    if (metadataColumnNames != null) {
+      scanBuilder.withMetadataColumns(metadataColumnNames);
+    }
+    return scanBuilder;
+  }
+}
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
index 92013d3..027c88c 100644
--- a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkCatalog.java
@@ -19,7 +19,6 @@
 
 package org.apache.iceberg.spark.source;
 
-import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkSessionCatalog;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -31,7 +30,14 @@ public class TestSparkCatalog<T extends TableCatalog & SupportsNamespaces> exten
 
   @Override
   public Table loadTable(Identifier ident) throws NoSuchTableException {
-    TestTables.TestTable table = TestTables.load(Spark3Util.identifierToTableIdentifier(ident).toString());
-    return new SparkTable(table, false);
+    String[] parts = ident.name().split("\\$", 2);
+    if (parts.length == 2) {
+      TestTables.TestTable table = TestTables.load(parts[0]);
+      String[] metadataColumns = parts[1].split(",");
+      return new SparkTestTable(table, metadataColumns, false);
+    } else {
+      TestTables.TestTable table = TestTables.load(ident.name());
+      return new SparkTestTable(table, null, false);
+    }
   }
 }
diff --git a/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
new file mode 100644
index 0000000..b29d281
--- /dev/null
+++ b/spark3/src/test/java/org/apache/iceberg/spark/source/TestSparkMetadataColumns.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.HasTableOperations;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.PartitionSpecParser;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkTestBase;
+import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.FORMAT_VERSION;
+import static org.apache.iceberg.TableProperties.ORC_VECTORIZATION_ENABLED;
+import static org.apache.iceberg.TableProperties.PARQUET_VECTORIZATION_ENABLED;
+
+@RunWith(Parameterized.class)
+public class TestSparkMetadataColumns extends SparkTestBase {
+
+  private static final String TABLE_NAME = "test_table";
+  private static final Schema SCHEMA = new Schema(
+      Types.NestedField.required(1, "id", Types.LongType.get()),
+      Types.NestedField.optional(2, "category", Types.StringType.get()),
+      Types.NestedField.optional(3, "data", Types.StringType.get())
+  );
+  private static final PartitionSpec UNKNOWN_SPEC = PartitionSpecParser.fromJson(SCHEMA,
+      "{ \"spec-id\": 1, \"fields\": [ { \"name\": \"id_zero\", \"transform\": \"zero\", \"source-id\": 1 } ] }");
+
+  @Parameterized.Parameters(name = "fileFormat = {0}, vectorized = {1}, formatVersion = {2}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+        { FileFormat.PARQUET, false, 1},
+        { FileFormat.PARQUET, true, 1},
+        { FileFormat.PARQUET, false, 2},
+        { FileFormat.PARQUET, true, 2},
+        { FileFormat.AVRO, false, 1},
+        { FileFormat.AVRO, false, 2},
+        { FileFormat.ORC, false, 1},
+        { FileFormat.ORC, true, 1},
+        { FileFormat.ORC, false, 2},
+        { FileFormat.ORC, true, 2},
+    };
+  }
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private final FileFormat fileFormat;
+  private final boolean vectorized;
+  private final int formatVersion;
+
+  private Table table = null;
+
+  public TestSparkMetadataColumns(FileFormat fileFormat, boolean vectorized, int formatVersion) {
+    this.fileFormat = fileFormat;
+    this.vectorized = vectorized;
+    this.formatVersion = formatVersion;
+  }
+
+  @BeforeClass
+  public static void setupSpark() {
+    ImmutableMap<String, String> config = ImmutableMap.of(
+        "type", "hive",
+        "default-namespace", "default",
+        "cache-enabled", "true"
+    );
+    spark.conf().set("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.source.TestSparkCatalog");
+    config.forEach((key, value) -> spark.conf().set("spark.sql.catalog.spark_catalog." + key, value));
+  }
+
+  @Before
+  public void setupTable() throws IOException {
+    createAndInitTable();
+  }
+
+  @After
+  public void dropTable() {
+    TestTables.clearTables();
+  }
+
+  // TODO: remove testing workarounds once we compile against Spark 3.2
+
+  @Test
+  public void testSpecAndPartitionMetadataColumns() {
+    // TODO: support metadata structs in vectorized ORC reads
+    Assume.assumeFalse(fileFormat == FileFormat.ORC && vectorized);
+
+    sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+    table.refresh();
+    table.updateSpec()
+        .addField("data")
+        .commit();
+    sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+    table.refresh();
+    table.updateSpec()
+        .addField(Expressions.bucket("category", 8))
+        .commit();
+    sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+    table.refresh();
+    table.updateSpec()
+        .removeField("data")
+        .commit();
+    sql("INSERT INTO TABLE %s VALUES (1, 'a1', 'b1')", TABLE_NAME);
+
+    table.refresh();
+    table.updateSpec()
+        .renameField("category_bucket_8", "category_bucket_8_another_name")
+        .commit();
+
+    List<Object[]> expected = ImmutableList.of(
+        row(0, row(null, null)),
+        row(1, row("b1", null)),
+        row(2, row("b1", 2)),
+        row(3, row(null, 2))
+    );
+    assertEquals("Rows must match", expected,
+        sql("SELECT _spec_id, _partition FROM `%s$_spec_id,_partition` ORDER BY _spec_id", TABLE_NAME));
+  }
+
+  @Test
+  public void testPartitionMetadataColumnWithUnknownTransforms() {
+    // replace the table spec to include an unknown transform
+    TableOperations ops = ((HasTableOperations) table).operations();
+    TableMetadata base = ops.current();
+    ops.commit(base, base.updatePartitionSpec(UNKNOWN_SPEC));
+
+    AssertHelpers.assertThrows("Should fail to query the partition metadata column",
+        ValidationException.class, "Cannot build table partition type, unknown transforms",
+        () -> sql("SELECT _partition FROM `%s$_partition`", TABLE_NAME));
+  }
+
+  private void createAndInitTable() throws IOException {
+    this.table = TestTables.create(temp.newFolder(), TABLE_NAME, SCHEMA, PartitionSpec.unpartitioned());
+
+    UpdateProperties updateProperties = table.updateProperties();
+    updateProperties.set(FORMAT_VERSION, String.valueOf(formatVersion));
+    updateProperties.set(DEFAULT_FILE_FORMAT, fileFormat.name());
+
+    switch (fileFormat) {
+      case PARQUET:
+        updateProperties.set(PARQUET_VECTORIZATION_ENABLED, String.valueOf(vectorized));
+        break;
+      case ORC:
+        updateProperties.set(ORC_VECTORIZATION_ENABLED, String.valueOf(vectorized));
+        break;
+      default:
+        Preconditions.checkState(!vectorized, "File format %s does not support vectorized reads", fileFormat);
+    }
+
+    updateProperties.commit();
+  }
+}