You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/07/16 16:44:21 UTC

[incubator-iceberg] branch master updated: Fix data files table for unpartitioned tables (#285)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 33a3882  Fix data files table for unpartitioned tables (#285)
33a3882 is described below

commit 33a38826f6c104e52981df15d49f1ee420e04c49
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Tue Jul 16 09:44:16 2019 -0700

    Fix data files table for unpartitioned tables (#285)
---
 .../main/java/org/apache/iceberg/Filterable.java   | 34 +++++++++++
 .../java/org/apache/iceberg/DataFilesTable.java    | 36 +++++++++---
 .../java/org/apache/iceberg/DataTableScan.java     |  8 +--
 .../java/org/apache/iceberg/FilteredManifest.java  | 63 ++++++++++++++-------
 .../org/apache/iceberg/ManifestEntriesTable.java   | 16 ++++--
 .../java/org/apache/iceberg/ManifestEntry.java     |  2 +-
 .../java/org/apache/iceberg/ManifestReader.java    | 66 ++++++++--------------
 .../org/apache/iceberg/spark/source/Reader.java    |  7 ++-
 .../spark/source/TestIcebergSourceHiveTables.java  | 54 ++++++++++++++++++
 9 files changed, 201 insertions(+), 85 deletions(-)

diff --git a/api/src/main/java/org/apache/iceberg/Filterable.java b/api/src/main/java/org/apache/iceberg/Filterable.java
index 7be5cf1..d53a318 100644
--- a/api/src/main/java/org/apache/iceberg/Filterable.java
+++ b/api/src/main/java/org/apache/iceberg/Filterable.java
@@ -61,6 +61,14 @@ public interface Filterable<T extends Filterable<T>> extends CloseableIterable<D
   T select(Collection<String> columns);
 
   /**
+   * Set the projection from a schema.
+   *
+   * @param fileProjection a projection of the DataFile schema
+   * @return a Filterable that will load only the given schema's columns
+   */
+  T project(Schema fileProjection);
+
+  /**
    * Adds a filter expression on partition data for matching files.
    * <p>
    * If the Filterable object already has partition filters, the new filter will be added as an
@@ -90,4 +98,30 @@ public interface Filterable<T extends Filterable<T>> extends CloseableIterable<D
    * @return a Filterable that will load only rows that match expr
    */
   T filterRows(Expression expr);
+
+  /**
+   * Sets case sensitivity.
+   *
+   * @param isCaseSensitive true if expression binding and schema projection should be case sensitive
+   * @return a Filterable that will use the specified case sensitivity
+   */
+  T caseSensitive(boolean isCaseSensitive);
+
+  /**
+   * Sets case sensitive binding and projection.
+   *
+   * @return a Filterable that will case sensitive binding and projection
+   */
+  default T caseSensitive() {
+    return caseSensitive(true);
+  }
+
+  /**
+   * Sets case insensitive binding and projection.
+   *
+   * @return a Filterable that will case insensitive binding and projection
+   */
+  default T caseInsensitive() {
+    return caseSensitive(false);
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index 63ca855..6958992 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -20,12 +20,14 @@
 package org.apache.iceberg;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
 import java.util.Collection;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.types.TypeUtil;
 
 /**
  * A {@link Table} implementation that exposes a table's data files as rows.
@@ -51,12 +53,18 @@ class DataFilesTable extends BaseMetadataTable {
 
   @Override
   public TableScan newScan() {
-    return new FilesTableScan(ops, table);
+    return new FilesTableScan(ops, table, schema());
   }
 
   @Override
   public Schema schema() {
-    return new Schema(DataFile.getType(table.spec().partitionType()).fields());
+    Schema schema = new Schema(DataFile.getType(table.spec().partitionType()).fields());
+    if (table.spec().fields().size() < 1) {
+      // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
+      return TypeUtil.selectNot(schema, Sets.newHashSet(102));
+    } else {
+      return schema;
+    }
   }
 
   @Override
@@ -66,15 +74,18 @@ class DataFilesTable extends BaseMetadataTable {
 
   public static class FilesTableScan extends BaseTableScan {
     private static final long TARGET_SPLIT_SIZE = 32 * 1024 * 1024; // 32 MB
+    private final Schema fileSchema;
 
-    FilesTableScan(TableOperations ops, Table table) {
-      super(ops, table, ManifestEntry.getSchema(table.spec().partitionType()));
+    FilesTableScan(TableOperations ops, Table table, Schema fileSchema) {
+      super(ops, table, fileSchema);
+      this.fileSchema = fileSchema;
     }
 
     private FilesTableScan(
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
-        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+        boolean caseSensitive, boolean colStats, Collection<String> selectedColumns, Schema fileSchema) {
       super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+      this.fileSchema = fileSchema;
     }
 
     @Override
@@ -82,7 +93,7 @@ class DataFilesTable extends BaseMetadataTable {
         TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
         boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
       return new FilesTableScan(
-          ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+          ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, fileSchema);
     }
 
     @Override
@@ -106,25 +117,32 @@ class DataFilesTable extends BaseMetadataTable {
       String schemaString = SchemaParser.toJson(schema());
       String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
 
+      // Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
+      // This data task needs to use the table schema, which may not include a partition schema to avoid having an
+      // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
+      // all cases.
       return CloseableIterable.transform(manifests, manifest ->
           new ManifestReadTask(ops.io(), new BaseFileScanTask(
-              DataFiles.fromManifest(manifest), schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter))));
+              DataFiles.fromManifest(manifest), schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter)),
+              fileSchema));
     }
   }
 
   private static class ManifestReadTask implements DataTask {
     private final FileIO io;
     private final FileScanTask manifestTask;
+    private final Schema schema;
 
-    private ManifestReadTask(FileIO io, FileScanTask manifestTask) {
+    private ManifestReadTask(FileIO io, FileScanTask manifestTask, Schema schema) {
       this.io = io;
       this.manifestTask = manifestTask;
+      this.schema = schema;
     }
 
     @Override
     public CloseableIterable<StructLike> rows() {
       return CloseableIterable.transform(
-          ManifestReader.read(io.newInputFile(manifestTask.file().path().toString())),
+          ManifestReader.read(io.newInputFile(manifestTask.file().path().toString())).project(schema),
           file -> (GenericDataFile) file);
     }
 
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index 8ae9b31..4e9e82f 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -81,15 +81,15 @@ public class DataTableScan extends BaseTableScan {
     Iterable<CloseableIterable<FileScanTask>> readers = Iterables.transform(
         matchingManifests,
         manifest -> {
-          ManifestReader reader = ManifestReader
-              .read(ops.io().newInputFile(manifest.path()), ops.current()::spec)
-              .caseSensitive(caseSensitive);
+          ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()), ops.current()::spec);
           PartitionSpec spec = ops.current().spec(manifest.partitionSpecId());
           String schemaString = SchemaParser.toJson(spec.schema());
           String specString = PartitionSpecParser.toJson(spec);
           ResidualEvaluator residuals = ResidualEvaluator.of(spec, rowFilter, caseSensitive);
           return CloseableIterable.transform(
-              reader.filterRows(rowFilter).select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS),
+              reader.filterRows(rowFilter)
+                  .caseSensitive(caseSensitive)
+                  .select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS),
               file -> new BaseFileScanTask(file, schemaString, specString, residuals)
           );
         });
diff --git a/core/src/main/java/org/apache/iceberg/FilteredManifest.java b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
index 73bb86f..31cc11b 100644
--- a/core/src/main/java/org/apache/iceberg/FilteredManifest.java
+++ b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
@@ -43,6 +43,7 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
   private final ManifestReader reader;
   private final Expression partFilter;
   private final Expression rowFilter;
+  private final Schema fileSchema;
   private final Collection<String> columns;
   private final boolean caseSensitive;
 
@@ -51,37 +52,41 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
   private InclusiveMetricsEvaluator lazyMetricsEvaluator = null;
 
   FilteredManifest(ManifestReader reader, Expression partFilter, Expression rowFilter,
-                   Collection<String> columns, boolean caseSensitive) {
+                   Schema fileSchema, Collection<String> columns, boolean caseSensitive) {
     Preconditions.checkNotNull(reader, "ManifestReader cannot be null");
     this.reader = reader;
     this.partFilter = partFilter;
     this.rowFilter = rowFilter;
+    this.fileSchema = fileSchema;
     this.columns = columns;
     this.caseSensitive = caseSensitive;
   }
 
   @Override
   public FilteredManifest select(Collection<String> selectedColumns) {
-    return new FilteredManifest(reader, partFilter, rowFilter, selectedColumns, caseSensitive);
+    return new FilteredManifest(reader, partFilter, rowFilter, fileSchema, selectedColumns, caseSensitive);
+  }
+
+  @Override
+  public FilteredManifest project(Schema fileProjection) {
+    return new FilteredManifest(reader, partFilter, rowFilter, fileProjection, columns, caseSensitive);
   }
 
   @Override
   public FilteredManifest filterPartitions(Expression expr) {
-    return new FilteredManifest(reader,
-        Expressions.and(partFilter, expr),
-        rowFilter,
-        columns,
-        caseSensitive);
+    return new FilteredManifest(
+        reader, Expressions.and(partFilter, expr), rowFilter, fileSchema, columns, caseSensitive);
   }
 
   @Override
   public FilteredManifest filterRows(Expression expr) {
-    Expression projected = Projections.inclusive(reader.spec(), caseSensitive).project(expr);
-    return new FilteredManifest(reader,
-        Expressions.and(partFilter, projected),
-        Expressions.and(rowFilter, expr),
-        columns,
-        caseSensitive);
+    return new FilteredManifest(
+        reader, partFilter, Expressions.and(rowFilter, expr), fileSchema, columns, caseSensitive);
+  }
+
+  @Override
+  public FilteredManifest caseSensitive(boolean isCaseSensitive) {
+    return new FilteredManifest(reader, partFilter, rowFilter, fileSchema, columns, isCaseSensitive);
   }
 
   CloseableIterable<ManifestEntry> allEntries() {
@@ -90,13 +95,13 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
       Evaluator evaluator = evaluator();
       InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
 
-      return CloseableIterable.filter(reader.entries(columns),
+      return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
           entry -> entry != null &&
               evaluator.eval(entry.file().partition()) &&
               metricsEvaluator.eval(entry.file()));
 
     } else {
-      return reader.entries(columns);
+      return reader.entries(projection(fileSchema, columns, caseSensitive));
     }
   }
 
@@ -106,14 +111,14 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
       Evaluator evaluator = evaluator();
       InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
 
-      return CloseableIterable.filter(reader.entries(columns),
+      return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
           entry -> entry != null &&
               entry.status() != Status.DELETED &&
               evaluator.eval(entry.file().partition()) &&
               metricsEvaluator.eval(entry.file()));
 
     } else {
-      return CloseableIterable.filter(reader.entries(columns),
+      return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
           entry -> entry != null && entry.status() != Status.DELETED);
     }
   }
@@ -133,14 +138,16 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
       boolean dropStats = Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
 
       return Iterators.transform(
-          Iterators.filter(reader.iterator(partFilter, projectColumns),
+          Iterators.filter(reader.iterator(partFilter, projection(fileSchema, projectColumns, caseSensitive)),
               input -> input != null &&
                   evaluator.eval(input.partition()) &&
                   metricsEvaluator.eval(input)),
           dropStats ? DataFile::copyWithoutStats : DataFile::copy);
 
     } else {
-      return Iterators.transform(reader.iterator(partFilter, columns), DataFile::copy);
+      return Iterators.transform(
+          reader.iterator(partFilter, projection(fileSchema, columns, caseSensitive)),
+          DataFile::copy);
     }
   }
 
@@ -149,10 +156,24 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
     reader.close();
   }
 
+  private static Schema projection(Schema fileSchema, Collection<String> columns, boolean caseSensitive) {
+    if (columns != null) {
+      if (caseSensitive) {
+        return fileSchema.select(columns);
+      } else {
+        return fileSchema.caseInsensitiveSelect(columns);
+      }
+    }
+
+    return fileSchema;
+  }
+
   private Evaluator evaluator() {
     if (lazyEvaluator == null) {
-      if (partFilter != null) {
-        this.lazyEvaluator = new Evaluator(reader.spec().partitionType(), partFilter, caseSensitive);
+      Expression projected = Projections.inclusive(reader.spec(), caseSensitive).project(rowFilter);
+      Expression finalPartFilter = Expressions.and(projected, partFilter);
+      if (finalPartFilter != null) {
+        this.lazyEvaluator = new Evaluator(reader.spec().partitionType(), finalPartFilter, caseSensitive);
       } else {
         this.lazyEvaluator = new Evaluator(reader.spec().partitionType(), Expressions.alwaysTrue(), caseSensitive);
       }
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index 848b0c3..f3fe85e 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -19,11 +19,13 @@
 
 package org.apache.iceberg;
 
+import com.google.common.collect.Sets;
 import java.util.Collection;
 import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.ResidualEvaluator;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.TypeUtil;
 
 /**
  * A {@link Table} implementation that exposes a table's manifest entries as rows.
@@ -52,12 +54,18 @@ class ManifestEntriesTable extends BaseMetadataTable {
 
   @Override
   public TableScan newScan() {
-    return new EntriesTableScan(ops, table);
+    return new EntriesTableScan(ops, table, schema());
   }
 
   @Override
   public Schema schema() {
-    return ManifestEntry.getSchema(table.spec().partitionType());
+    Schema schema = ManifestEntry.getSchema(table.spec().partitionType());
+    if (table.spec().fields().size() < 1) {
+      // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
+      return TypeUtil.selectNot(schema, Sets.newHashSet(102));
+    } else {
+      return schema;
+    }
   }
 
   @Override
@@ -68,8 +76,8 @@ class ManifestEntriesTable extends BaseMetadataTable {
   private static class EntriesTableScan extends BaseTableScan {
     private static final long TARGET_SPLIT_SIZE = 32 * 1024 * 1024; // 32 MB
 
-    EntriesTableScan(TableOperations ops, Table table) {
-      super(ops, table, ManifestEntry.getSchema(table.spec().partitionType()));
+    EntriesTableScan(TableOperations ops, Table table, Schema schema) {
+      super(ops, table, schema);
     }
 
     private EntriesTableScan(
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
index 584f007..c296c97 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
@@ -166,7 +166,7 @@ class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable {
         new Schema(DataFile.getType(partitionType).fields()).select(columns).asStruct());
   }
 
-  private static Schema wrapFileSchema(StructType fileStruct) {
+  static Schema wrapFileSchema(StructType fileStruct) {
     // ids for top-level columns are assigned from 1000
     return new Schema(
         required(0, "status", IntegerType.get()),
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 781352e..dceac87 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -32,7 +32,6 @@ import org.apache.iceberg.avro.Avro;
 import org.apache.iceberg.avro.AvroIterable;
 import org.apache.iceberg.exceptions.RuntimeIOException;
 import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Projections;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.InputFile;
@@ -73,8 +72,7 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
   private final InputFile file;
   private final Map<String, String> metadata;
   private final PartitionSpec spec;
-  private final Schema schema;
-  private final boolean caseSensitive;
+  private final Schema fileSchema;
 
   // lazily initialized
   private List<ManifestEntry> cachedAdds = null;
@@ -101,34 +99,12 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
 
     if (specLookup != null) {
       this.spec = specLookup.apply(specId);
-      this.schema = spec.schema();
     } else {
-      this.schema = SchemaParser.fromJson(metadata.get("schema"));
+      Schema schema = SchemaParser.fromJson(metadata.get("schema"));
       this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
     }
 
-    this.caseSensitive = true;
-  }
-
-  private ManifestReader(InputFile file, Map<String, String> metadata,
-                         PartitionSpec spec, Schema schema, boolean caseSensitive) {
-    this.file = file;
-    this.metadata = metadata;
-    this.spec = spec;
-    this.schema = schema;
-    this.caseSensitive = caseSensitive;
-  }
-
-  /**
-   * Returns a new {@link ManifestReader} that, if filtered via {@link #select(java.util.Collection)},
-   * {@link #filterPartitions(Expression)} or {@link #filterRows(Expression)}, will apply the specified
-   * case sensitivity for column name matching.
-   *
-   * @param readCaseSensitive whether column name matching should have case sensitivity
-   * @return a manifest reader with case sensitivity as stated
-   */
-  public ManifestReader caseSensitive(boolean readCaseSensitive) {
-    return new ManifestReader(file, metadata, spec, schema, readCaseSensitive);
+    this.fileSchema = new Schema(DataFile.getType(spec.partitionType()).fields());
   }
 
   public InputFile file() {
@@ -136,7 +112,7 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
   }
 
   public Schema schema() {
-    return schema;
+    return fileSchema;
   }
 
   public PartitionSpec spec() {
@@ -145,21 +121,26 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
 
   @Override
   public FilteredManifest select(Collection<String> columns) {
-    return new FilteredManifest(this, alwaysTrue(), alwaysTrue(), Lists.newArrayList(columns), caseSensitive);
+    return new FilteredManifest(this, alwaysTrue(), alwaysTrue(), fileSchema, columns, true);
+  }
+
+  @Override
+  public FilteredManifest project(Schema fileProjection) {
+    return new FilteredManifest(this, alwaysTrue(), alwaysTrue(), fileProjection, ALL_COLUMNS, true);
   }
 
   @Override
   public FilteredManifest filterPartitions(Expression expr) {
-    return new FilteredManifest(this, expr, alwaysTrue(), ALL_COLUMNS, caseSensitive);
+    return new FilteredManifest(this, expr, alwaysTrue(), fileSchema, ALL_COLUMNS, true);
   }
 
   @Override
   public FilteredManifest filterRows(Expression expr) {
-    return new FilteredManifest(this,
-      Projections.inclusive(spec, caseSensitive).project(expr),
-      expr,
-      ALL_COLUMNS,
-      caseSensitive);
+    return new FilteredManifest(this, alwaysTrue(), expr, fileSchema, ALL_COLUMNS, true);
+  }
+
+  public FilteredManifest caseSensitive(boolean caseSensitive) {
+    return new FilteredManifest(this, alwaysTrue(), alwaysTrue(), fileSchema, ALL_COLUMNS, caseSensitive);
   }
 
   public List<ManifestEntry> addedFiles() {
@@ -180,7 +161,7 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
     List<ManifestEntry> adds = Lists.newArrayList();
     List<ManifestEntry> deletes = Lists.newArrayList();
 
-    for (ManifestEntry entry : entries(CHANGE_COLUNNS)) {
+    for (ManifestEntry entry : entries(fileSchema.select(CHANGE_COLUNNS))) {
       switch (entry.status()) {
         case ADDED:
           adds.add(entry.copyWithoutStats());
@@ -197,18 +178,17 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
   }
 
   CloseableIterable<ManifestEntry> entries() {
-    return entries(ALL_COLUMNS);
+    return entries(fileSchema);
   }
 
-  CloseableIterable<ManifestEntry> entries(Collection<String> columns) {
+  CloseableIterable<ManifestEntry> entries(Schema fileProjection) {
     FileFormat format = FileFormat.fromFileName(file.location());
     Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file);
 
-    Schema projectedSchema = ManifestEntry.projectSchema(spec.partitionType(), columns);
     switch (format) {
       case AVRO:
         AvroIterable<ManifestEntry> reader = Avro.read(file)
-            .project(projectedSchema)
+            .project(ManifestEntry.wrapFileSchema(fileProjection.asStruct()))
             .rename("manifest_entry", ManifestEntry.class.getName())
             .rename("partition", PartitionData.class.getName())
             .rename("r102", PartitionData.class.getName())
@@ -228,13 +208,13 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
 
   @Override
   public Iterator<DataFile> iterator() {
-    return iterator(alwaysTrue(), ALL_COLUMNS);
+    return iterator(alwaysTrue(), fileSchema);
   }
 
   // visible for use by PartialManifest
-  Iterator<DataFile> iterator(Expression partFilter, Collection<String> columns) {
+  Iterator<DataFile> iterator(Expression partFilter, Schema fileProjection) {
     return Iterables.transform(Iterables.filter(
-        entries(columns),
+        entries(fileProjection),
         entry -> entry.status() != ManifestEntry.Status.DELETED),
         ManifestEntry::file).iterator();
   }
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index e0b2851..91359ff 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -506,9 +506,10 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
 
     private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
       StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
-      Iterable<InternalRow> asSparkRows = Iterables.transform(task.asDataTask().rows(), row::setStruct);
-      return CloseableIterable.withNoopClose(
-          Iterables.transform(asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke));
+      CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
+          task.asDataTask().rows(), row::setStruct);
+      return CloseableIterable.transform(
+          asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke);
     }
   }
 
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
index 5a50731..81e2326 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
@@ -30,6 +30,7 @@ import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -217,6 +218,59 @@ public class TestIcebergSourceHiveTables {
   }
 
   @Test
+  public void testHiveFilesUnpartitionedTable() throws TException, IOException {
+    try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
+      Table table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA);
+      Table entriesTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "entries"));
+      Table filesTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "files"));
+
+      Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
+      Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
+
+      df1.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .save(TABLE_IDENTIFIER.toString());
+
+      table.refresh();
+      DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles());
+
+      // add a second file
+      df2.select("id", "data").write()
+          .format("iceberg")
+          .mode("append")
+          .save(TABLE_IDENTIFIER.toString());
+
+      // delete the first file to test that only live files are listed
+      table.newDelete().deleteFile(toDelete).commit();
+
+      List<Row> actual = spark.read()
+          .format("iceberg")
+          .load(DB_NAME + "." + TABLE_NAME + ".files")
+          .collectAsList();
+
+      List<GenericData.Record> expected = Lists.newArrayList();
+      for (ManifestFile manifest : table.currentSnapshot().manifests()) {
+        InputFile in = table.io().newInputFile(manifest.path());
+        try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
+          for (GenericData.Record record : rows) {
+            if ((Integer) record.get("status") < 2 /* added or existing */) {
+              expected.add((GenericData.Record) record.get("data_file"));
+            }
+          }
+        }
+      }
+
+      Assert.assertEquals("Files table should have one row", 1, expected.size());
+      Assert.assertEquals("Actual results should have one row", 1, actual.size());
+      TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0));
+
+    } finally {
+      metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+    }
+  }
+
+  @Test
   public void testHiveHistoryTable() throws TException {
     try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
       Table table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA, PartitionSpec.unpartitioned());