You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ru...@apache.org on 2021/08/05 12:23:15 UTC

[iceberg] branch master updated: Spark: Fix nested struct pruning (#2877)

This is an automated email from the ASF dual-hosted git repository.

russellspitzer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 6809103  Spark: Fix nested struct pruning (#2877)
6809103 is described below

commit 68091037944ff7e9de91e7b619f313a8e98c1adc
Author: Russell Spitzer <rs...@apple.com>
AuthorDate: Thu Aug 5 07:23:07 2021 -0500

    Spark: Fix nested struct pruning (#2877)
    
    * Spark: Support Nested Struct Pruning in DataTasks
    
    Previously DataTasks would return full schemas for some tables and pruned schemas for others and would rely on the underlying framework to do the actual projection. This moves projection and pruning into the core responsibility of the task. This fixes an issue where Spark would be able to pushdown some nested struct predicates to a metadata table but we wouldn't recognize this when trying to do the projection in the framework. StaticDataTasks now support projection in their creation b [...]
---
 .../org/apache/iceberg/util/StructProjection.java  |  28 ++-
 .../java/org/apache/iceberg/AllEntriesTable.java   |   5 +-
 .../java/org/apache/iceberg/AllManifestsTable.java |  18 +-
 .../java/org/apache/iceberg/DataFilesTable.java    |   6 +-
 .../main/java/org/apache/iceberg/HistoryTable.java |   5 +-
 .../org/apache/iceberg/ManifestEntriesTable.java   |  24 ++-
 .../java/org/apache/iceberg/ManifestsTable.java    |   5 +-
 .../java/org/apache/iceberg/PartitionsTable.java   |  14 +-
 .../java/org/apache/iceberg/SnapshotsTable.java    |   5 +-
 .../java/org/apache/iceberg/StaticDataTask.java    |  16 +-
 .../apache/iceberg/spark/source/RowDataReader.java |  40 +---
 .../spark/source/TestIcebergSourceTablesBase.java  | 211 +++++++++++++++++++++
 12 files changed, 303 insertions(+), 74 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/util/StructProjection.java b/api/src/main/java/org/apache/iceberg/util/StructProjection.java
index d916e77..be05b0f 100644
--- a/api/src/main/java/org/apache/iceberg/util/StructProjection.java
+++ b/api/src/main/java/org/apache/iceberg/util/StructProjection.java
@@ -23,8 +23,11 @@ import java.util.List;
 import java.util.Set;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.ListType;
+import org.apache.iceberg.types.Types.MapType;
 import org.apache.iceberg.types.Types.StructType;
 
 public class StructProjection implements StructLike {
@@ -82,8 +85,31 @@ public class StructProjection implements StructLike {
                   dataField.type().asStructType(), projectedField.type().asStructType());
               break;
             case MAP:
+              MapType projectedMap = projectedField.type().asMapType();
+              MapType originalMap = dataField.type().asMapType();
+
+              boolean keyProjectable = !projectedMap.keyType().isNestedType() ||
+                  projectedMap.keyType().equals(originalMap.keyType());
+              boolean valueProjectable = !projectedMap.valueType().isNestedType() ||
+                  projectedMap.valueType().equals(originalMap.valueType());
+              Preconditions.checkArgument(keyProjectable && valueProjectable,
+                  "Cannot project a partial map key or value struct. Trying to project %s out of %s",
+                  projectedField, dataField);
+
+              nestedProjections[pos] = null;
+              break;
             case LIST:
-              throw new IllegalArgumentException(String.format("Cannot project list or map field: %s", projectedField));
+              ListType projectedList = projectedField.type().asListType();
+              ListType originalList = dataField.type().asListType();
+
+              boolean elementProjectable = !projectedList.elementType().isNestedType() ||
+                  projectedList.elementType().equals(originalList.elementType());
+              Preconditions.checkArgument(elementProjectable,
+                  "Cannot project a partial list element struct. Trying to project %s out of %s",
+                  projectedField, dataField);
+
+              nestedProjections[pos] = null;
+              break;
             default:
               nestedProjections[pos] = null;
           }
diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
index c69429d..c1b7145 100644
--- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
@@ -28,7 +28,6 @@ import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.util.ParallelIterable;
 import org.apache.iceberg.util.ThreadPools;
@@ -102,15 +101,13 @@ public class AllEntriesTable extends BaseMetadataTable {
         TableOperations ops, Snapshot snapshot, Expression rowFilter,
         boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
       CloseableIterable<ManifestFile> manifests = allManifestFiles(ops.current().snapshots());
-      Type fileProjection = schema().findType("data_file");
-      Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema();
       String schemaString = SchemaParser.toJson(schema());
       String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
       Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
       return CloseableIterable.transform(manifests, manifest -> new ManifestEntriesTable.ManifestReadTask(
-          ops.io(), manifest, fileSchema, schemaString, specString, residuals, ops.current().specsById()));
+          ops.io(), manifest, schema(), schemaString, specString, residuals, ops.current().specsById()));
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
index d671934..6843929 100644
--- a/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllManifestsTable.java
@@ -31,6 +31,7 @@ import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.StructProjection;
 
 /**
  * A {@link Table} implementation that exposes a table's valid manifest files as rows.
@@ -124,7 +125,6 @@ public class AllManifestsTable extends BaseMetadataTable {
       String schemaString = SchemaParser.toJson(schema());
       String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
 
-      // Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
       return CloseableIterable.withNoopClose(Iterables.transform(ops.current().snapshots(), snap -> {
         if (snap.manifestListLocation() != null) {
           Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
@@ -134,14 +134,15 @@ public class AllManifestsTable extends BaseMetadataTable {
               .withRecordCount(1)
               .withFormat(FileFormat.AVRO)
               .build();
-          return new ManifestListReadTask(ops.io(), table().spec(), new BaseFileScanTask(
+          return new ManifestListReadTask(ops.io(), schema(), table().spec(), new BaseFileScanTask(
               manifestListAsDataFile, null,
               schemaString, specString, residuals));
         } else {
           return StaticDataTask.of(
               ops.io().newInputFile(ops.current().metadataFileLocation()),
-              snap.allManifests(),
-              manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest));
+              MANIFEST_FILE_SCHEMA, schema(), snap.allManifests(),
+              manifest -> ManifestsTable.manifestFileToRow(table().spec(), manifest)
+          );
         }
       }));
     }
@@ -149,11 +150,13 @@ public class AllManifestsTable extends BaseMetadataTable {
 
   static class ManifestListReadTask implements DataTask {
     private final FileIO io;
+    private final Schema schema;
     private final PartitionSpec spec;
     private final FileScanTask manifestListTask;
 
-    ManifestListReadTask(FileIO io, PartitionSpec spec, FileScanTask manifestListTask) {
+    ManifestListReadTask(FileIO io, Schema schema,  PartitionSpec spec, FileScanTask manifestListTask) {
       this.io = io;
+      this.schema = schema;
       this.spec = spec;
       this.manifestListTask = manifestListTask;
     }
@@ -175,9 +178,12 @@ public class AllManifestsTable extends BaseMetadataTable {
           .reuseContainers(false)
           .build()) {
 
-        return CloseableIterable.transform(manifests,
+        CloseableIterable<StructLike> rowIterable =  CloseableIterable.transform(manifests,
             manifest -> ManifestsTable.manifestFileToRow(spec, manifest));
 
+        StructProjection projection = StructProjection.create(MANIFEST_FILE_SCHEMA, schema);
+        return CloseableIterable.transform(rowIterable, projection::wrap);
+
       } catch (IOException e) {
         throw new RuntimeIOException(e, "Cannot read manifest list file: %s", manifestListTask.file().path());
       }
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index 13d28ad..145663c 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -109,12 +109,8 @@ public class DataFilesTable extends BaseMetadataTable {
       Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
-      // Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
-      // This data task needs to use the table schema, which may not include a partition schema to avoid having an
-      // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
-      // all cases.
       return CloseableIterable.transform(manifests, manifest ->
-          new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals));
+          new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals));
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/HistoryTable.java b/core/src/main/java/org/apache/iceberg/HistoryTable.java
index 9b607cc..9d0b0a6 100644
--- a/core/src/main/java/org/apache/iceberg/HistoryTable.java
+++ b/core/src/main/java/org/apache/iceberg/HistoryTable.java
@@ -68,8 +68,9 @@ public class HistoryTable extends BaseMetadataTable {
     TableOperations ops = operations();
     return StaticDataTask.of(
         ops.io().newInputFile(ops.current().metadataFileLocation()),
-        ops.current().snapshotLog(),
-        convertHistoryEntryFunc(table()));
+        schema(), scan.schema(), ops.current().snapshotLog(),
+        convertHistoryEntryFunc(table())
+    );
   }
 
   private class HistoryScan extends StaticTableScan {
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index 6b434fa..7bae349 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -29,6 +29,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.StructProjection;
 
 /**
  * A {@link Table} implementation that exposes a table's manifest entries as rows, for both delete and data files.
@@ -107,44 +108,53 @@ public class ManifestEntriesTable extends BaseMetadataTable {
         boolean ignoreResiduals, boolean caseSensitive, boolean colStats) {
       // return entries from both data and delete manifests
       CloseableIterable<ManifestFile> manifests = CloseableIterable.withNoopClose(snapshot.allManifests());
-      Type fileProjection = schema().findType("data_file");
-      Schema fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema();
       String schemaString = SchemaParser.toJson(schema());
       String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
       Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : rowFilter;
       ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(filter);
 
       return CloseableIterable.transform(manifests, manifest ->
-          new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals,
+          new ManifestReadTask(ops.io(), manifest, schema(), schemaString, specString, residuals,
               ops.current().specsById()));
     }
   }
 
   static class ManifestReadTask extends BaseFileScanTask implements DataTask {
+    private final Schema schema;
     private final Schema fileSchema;
     private final FileIO io;
     private final ManifestFile manifest;
     private final Map<Integer, PartitionSpec> specsById;
 
-    ManifestReadTask(FileIO io, ManifestFile manifest, Schema fileSchema, String schemaString,
+    ManifestReadTask(FileIO io, ManifestFile manifest, Schema schema, String schemaString,
                      String specString, ResidualEvaluator residuals, Map<Integer, PartitionSpec> specsById) {
       super(DataFiles.fromManifest(manifest), null, schemaString, specString, residuals);
-      this.fileSchema = fileSchema;
+      this.schema = schema;
       this.io = io;
       this.manifest = manifest;
       this.specsById = specsById;
+
+      Type fileProjection = schema.findType("data_file");
+      this.fileSchema = fileProjection != null ? new Schema(fileProjection.asStructType().fields()) : new Schema();
     }
 
     @Override
     public CloseableIterable<StructLike> rows() {
+      // Project data-file fields
+      CloseableIterable<StructLike> prunedRows;
       if (manifest.content() == ManifestContent.DATA) {
-        return CloseableIterable.transform(ManifestFiles.read(manifest, io).project(fileSchema).entries(),
+        prunedRows = CloseableIterable.transform(ManifestFiles.read(manifest, io).project(fileSchema).entries(),
             file -> (GenericManifestEntry<DataFile>) file);
       } else {
-        return CloseableIterable.transform(ManifestFiles.readDeleteManifest(manifest, io, specsById)
+        prunedRows = CloseableIterable.transform(ManifestFiles.readDeleteManifest(manifest, io, specsById)
                 .project(fileSchema).entries(),
             file -> (GenericManifestEntry<DeleteFile>) file);
       }
+
+      // Project non-readable fields
+      Schema readSchema = ManifestEntry.wrapFileSchema(fileSchema.asStruct());
+      StructProjection projection = StructProjection.create(readSchema, schema);
+      return CloseableIterable.transform(prunedRows, projection::wrap);
     }
 
     @Override
diff --git a/core/src/main/java/org/apache/iceberg/ManifestsTable.java b/core/src/main/java/org/apache/iceberg/ManifestsTable.java
index d67b833..a818840 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestsTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestsTable.java
@@ -75,8 +75,9 @@ public class ManifestsTable extends BaseMetadataTable {
     String location = scan.snapshot().manifestListLocation();
     return StaticDataTask.of(
         ops.io().newInputFile(location != null ? location : ops.current().metadataFileLocation()),
-        scan.snapshot().allManifests(),
-        manifest -> ManifestsTable.manifestFileToRow(spec, manifest));
+        schema(), scan.schema(), scan.snapshot().allManifests(),
+        manifest -> ManifestsTable.manifestFileToRow(spec, manifest)
+    );
   }
 
   private class ManifestsTableScan extends StaticTableScan {
diff --git a/core/src/main/java/org/apache/iceberg/PartitionsTable.java b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
index e190ca4..0215dfd 100644
--- a/core/src/main/java/org/apache/iceberg/PartitionsTable.java
+++ b/core/src/main/java/org/apache/iceberg/PartitionsTable.java
@@ -76,11 +76,17 @@ public class PartitionsTable extends BaseMetadataTable {
     Iterable<Partition> partitions = partitions(scan);
     if (table().spec().fields().size() < 1) {
       // the table is unpartitioned, partitions contains only the root partition
-      return StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()), partitions,
-          root -> StaticDataTask.Row.of(root.recordCount, root.fileCount));
+      return StaticDataTask.of(
+          io().newInputFile(ops.current().metadataFileLocation()),
+          schema(), scan.schema(), partitions,
+          root -> StaticDataTask.Row.of(root.recordCount, root.fileCount)
+      );
     } else {
-      return StaticDataTask.of(io().newInputFile(ops.current().metadataFileLocation()), partitions,
-          PartitionsTable::convertPartition);
+      return StaticDataTask.of(
+          io().newInputFile(ops.current().metadataFileLocation()),
+          schema(), scan.schema(), partitions,
+          PartitionsTable::convertPartition
+      );
     }
   }
 
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java
index 3501662..4bb83af 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotsTable.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotsTable.java
@@ -60,8 +60,9 @@ public class SnapshotsTable extends BaseMetadataTable {
     TableOperations ops = operations();
     return StaticDataTask.of(
         ops.io().newInputFile(ops.current().metadataFileLocation()),
-        ops.current().snapshots(),
-        SnapshotsTable::snapshotToRow);
+        schema(), scan.schema(), ops.current().snapshots(),
+        SnapshotsTable::snapshotToRow
+    );
   }
 
   @Override
diff --git a/core/src/main/java/org/apache/iceberg/StaticDataTask.java b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
index 24bff01..3aabd72 100644
--- a/core/src/main/java/org/apache/iceberg/StaticDataTask.java
+++ b/core/src/main/java/org/apache/iceberg/StaticDataTask.java
@@ -30,18 +30,26 @@ import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.StructProjection;
 
 class StaticDataTask implements DataTask {
 
-  static <T> DataTask of(InputFile metadata, Iterable<T> values, Function<T, Row> transform) {
+  static <T> DataTask of(InputFile metadata, Schema tableSchema, Schema projectedSchema, Iterable<T> values,
+      Function<T, Row> transform) {
     return new StaticDataTask(metadata,
+        tableSchema,
+        projectedSchema,
         Lists.newArrayList(Iterables.transform(values, transform::apply)).toArray(new Row[0]));
   }
 
   private final DataFile metadataFile;
   private final StructLike[] rows;
+  private final Schema tableSchema;
+  private final Schema projectedSchema;
 
-  private StaticDataTask(InputFile metadata, StructLike[] rows) {
+  private StaticDataTask(InputFile metadata, Schema tableSchema, Schema projectedSchema, StructLike[] rows) {
+    this.tableSchema = tableSchema;
+    this.projectedSchema = projectedSchema;
     this.metadataFile = DataFiles.builder(PartitionSpec.unpartitioned())
         .withInputFile(metadata)
         .withRecordCount(rows.length)
@@ -57,7 +65,9 @@ class StaticDataTask implements DataTask {
 
   @Override
   public CloseableIterable<StructLike> rows() {
-    return CloseableIterable.withNoopClose(Arrays.asList(rows));
+    StructProjection projection = StructProjection.create(tableSchema, projectedSchema);
+    Iterable<StructLike> projectedRows = Iterables.transform(Arrays.asList(rows), projection::wrap);
+    return CloseableIterable.withNoopClose(projectedRows);
   }
 
   @Override
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
index 6d4bf8e..391d4a0 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java
@@ -19,7 +19,6 @@
 
 package org.apache.iceberg.spark.source;
 
-import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.CombinedScanTask;
 import org.apache.iceberg.DataFile;
@@ -31,7 +30,6 @@ import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.common.DynMethods;
 import org.apache.iceberg.data.DeleteFilter;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
@@ -40,28 +38,17 @@ import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.orc.ORC;
 import org.apache.iceberg.parquet.Parquet;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.SparkAvroReader;
 import org.apache.iceberg.spark.data.SparkOrcReader;
 import org.apache.iceberg.spark.data.SparkParquetReaders;
 import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.PartitionUtil;
 import org.apache.spark.rdd.InputFileBlockHolder;
 import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.catalyst.expressions.Attribute;
-import org.apache.spark.sql.catalyst.expressions.AttributeReference;
-import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
-import org.apache.spark.sql.types.StructType;
-import scala.collection.JavaConverters;
 
 class RowDataReader extends BaseDataReader<InternalRow> {
-  // for some reason, the apply method can't be called from Java without reflection
-  private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply")
-      .impl(UnsafeProjection.class, InternalRow.class)
-      .build();
 
   private final Schema tableSchema;
   private final Schema expectedSchema;
@@ -186,33 +173,10 @@ class RowDataReader extends BaseDataReader<InternalRow> {
   }
 
   private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
-    StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
+    StructInternalRow row = new StructInternalRow(readSchema.asStruct());
     CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
         task.asDataTask().rows(), row::setStruct);
-    return CloseableIterable.transform(
-        asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke);
-  }
-
-  private static UnsafeProjection projection(Schema finalSchema, Schema readSchema) {
-    StructType struct = SparkSchemaUtil.convert(readSchema);
-
-    List<AttributeReference> refs = JavaConverters.seqAsJavaListConverter(struct.toAttributes()).asJava();
-    List<Attribute> attrs = Lists.newArrayListWithExpectedSize(struct.fields().length);
-    List<org.apache.spark.sql.catalyst.expressions.Expression> exprs =
-        Lists.newArrayListWithExpectedSize(struct.fields().length);
-
-    for (AttributeReference ref : refs) {
-      attrs.add(ref.toAttribute());
-    }
-
-    for (Types.NestedField field : finalSchema.columns()) {
-      int indexInReadSchema = struct.fieldIndex(field.name());
-      exprs.add(refs.get(indexInReadSchema));
-    }
-
-    return UnsafeProjection.create(
-        JavaConverters.asScalaBufferConverter(exprs).asScala().toSeq(),
-        JavaConverters.asScalaBufferConverter(attrs).asScala().toSeq());
+    return asSparkRows;
   }
 
   protected class SparkDeleteFilter extends DeleteFilter<InternalRow> {
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index 4244137..e4ca09f 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -23,6 +23,7 @@ import java.util.Comparator;
 import java.util.List;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileContent;
 import org.apache.iceberg.ManifestFile;
@@ -38,13 +39,16 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkTestBase;
 import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
@@ -144,6 +148,89 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
   }
 
   @Test
+  public void testEntriesTableDataFilePrune() throws Exception {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+
+    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
+
+    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
+    inputDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+
+    List<Object[]> singleActual = rowsToJava(spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier, "entries"))
+        .select("data_file.file_path")
+        .collectAsList());
+
+    List<Object[]> singleExpected = ImmutableList.of(row(file.path()));
+
+    assertEquals("Should prune a single element from a nested struct", singleExpected, singleActual);
+  }
+
+  @Test
+  public void testEntriesTableDataFilePruneMulti() throws Exception {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+
+    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
+
+    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
+    inputDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+
+    List<Object[]> multiActual = rowsToJava(spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier, "entries"))
+        .select("data_file.file_path", "data_file.value_counts", "data_file.record_count", "data_file.column_sizes")
+        .collectAsList());
+
+    List<Object[]> multiExpected = ImmutableList.of(
+        row(file.path(), file.valueCounts(), file.recordCount(), file.columnSizes()));
+
+    assertEquals("Should prune a single element from a nested struct", multiExpected, multiActual);
+  }
+
+  @Test
+  public void testFilesSelectMap() throws Exception {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+
+    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
+
+    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
+    inputDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+    DataFile file = table.currentSnapshot().addedFiles().iterator().next();
+
+    List<Object[]> multiActual = rowsToJava(spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier, "files"))
+        .select("file_path", "value_counts", "record_count", "column_sizes")
+        .collectAsList());
+
+    List<Object[]> multiExpected = ImmutableList.of(
+        row(file.path(), file.valueCounts(), file.recordCount(), file.columnSizes()));
+
+    assertEquals("Should prune a single element from a row", multiExpected, multiActual);
+  }
+
+  @Test
   public void testAllEntriesTable() throws Exception {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "entries_test");
     Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
@@ -678,6 +765,70 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
   }
 
   @Test
+  public void testPrunedSnapshotsTable() {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "snapshots_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.unpartitioned());
+
+    List<SimpleRecord> records = Lists.newArrayList(new SimpleRecord(1, "1"));
+    Dataset<Row> inputDf = spark.createDataFrame(records, SimpleRecord.class);
+
+    inputDf.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    table.refresh();
+    long firstSnapshotTimestamp = table.currentSnapshot().timestampMillis();
+    long firstSnapshotId = table.currentSnapshot().snapshotId();
+
+    table.newDelete().deleteFromRowFilter(Expressions.alwaysTrue()).commit();
+
+    long secondSnapshotTimestamp = table.currentSnapshot().timestampMillis();
+
+    // rollback the table state to the first snapshot
+    table.rollback().toSnapshotId(firstSnapshotId).commit();
+
+    Dataset<Row> actualDf = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier, "snapshots"))
+        .select("operation", "committed_at", "summary", "parent_id");
+
+    Schema projectedSchema = SparkSchemaUtil.convert(actualDf.schema());
+
+    List<Row> actual = actualDf.collectAsList();
+
+    GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema, "snapshots"));
+    List<GenericData.Record> expected = Lists.newArrayList(
+        builder.set("committed_at", firstSnapshotTimestamp * 1000)
+            .set("parent_id", null)
+            .set("operation", "append")
+            .set("summary", ImmutableMap.of(
+                "added-records", "1",
+                "added-data-files", "1",
+                "changed-partition-count", "1",
+                "total-data-files", "1",
+                "total-records", "1"
+            ))
+            .build(),
+        builder.set("committed_at", secondSnapshotTimestamp * 1000)
+            .set("parent_id", firstSnapshotId)
+            .set("operation", "delete")
+            .set("summary", ImmutableMap.of(
+                "deleted-records", "1",
+                "deleted-data-files", "1",
+                "changed-partition-count", "1",
+                "total-records", "0",
+                "total-data-files", "0"
+            ))
+            .build()
+    );
+
+    Assert.assertEquals("Snapshots table should have a row for each snapshot", 2, actual.size());
+    TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(0), actual.get(0));
+    TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(1), actual.get(1));
+  }
+
+  @Test
   public void testManifestsTable() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test");
     Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
@@ -725,6 +876,66 @@ public abstract class TestIcebergSourceTablesBase extends SparkTestBase {
   }
 
   @Test
+  public void testPruneManifestsTable() {
+    TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test");
+    Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());
+    Table manifestTable = loadTable(tableIdentifier, "manifests");
+    Dataset<Row> df1 = spark.createDataFrame(
+        Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(null, "b")), SimpleRecord.class);
+
+    df1.select("id", "data").write()
+        .format("iceberg")
+        .mode("append")
+        .save(loadLocation(tableIdentifier));
+
+    if (!spark.version().startsWith("2")) {
+      // Spark 2 isn't able to actually push down nested struct projections so this will not break
+      AssertHelpers.assertThrows("Can't prune struct inside list", SparkException.class,
+          "Cannot project a partial list element struct",
+          () -> spark.read()
+              .format("iceberg")
+              .load(loadLocation(tableIdentifier, "manifests"))
+              .select("partition_spec_id", "path", "partition_summaries.contains_null")
+              .collectAsList());
+    }
+
+    Dataset<Row> actualDf = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier, "manifests"))
+        .select("partition_spec_id", "path", "partition_summaries");
+
+    Schema projectedSchema = SparkSchemaUtil.convert(actualDf.schema());
+
+    List<Row> actual = spark.read()
+        .format("iceberg")
+        .load(loadLocation(tableIdentifier, "manifests"))
+        .select("partition_spec_id", "path", "partition_summaries")
+        .collectAsList();
+
+    table.refresh();
+
+    GenericRecordBuilder builder = new GenericRecordBuilder(AvroSchemaUtil.convert(projectedSchema.asStruct()));
+    GenericRecordBuilder summaryBuilder = new GenericRecordBuilder(AvroSchemaUtil.convert(
+        projectedSchema.findType("partition_summaries.element").asStructType(), "partition_summary"));
+    List<GenericData.Record> expected = Lists.transform(table.currentSnapshot().allManifests(), manifest ->
+        builder.set("partition_spec_id", manifest.partitionSpecId())
+            .set("path", manifest.path())
+            .set("partition_summaries", Lists.transform(manifest.partitions(), partition ->
+                summaryBuilder
+                    .set("contains_null", true)
+                    .set("contains_nan", false)
+                    .set("lower_bound", "1")
+                    .set("upper_bound", "1")
+                    .build()
+            ))
+            .build()
+    );
+
+    Assert.assertEquals("Manifests table should have one manifest row", 1, actual.size());
+    TestHelpers.assertEqualsSafe(projectedSchema.asStruct(), expected.get(0), actual.get(0));
+  }
+
+  @Test
   public void testAllManifestsTable() {
     TableIdentifier tableIdentifier = TableIdentifier.of("db", "manifests_test");
     Table table = createTable(tableIdentifier, SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build());