You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2019/07/16 16:44:21 UTC
[incubator-iceberg] branch master updated: Fix data files table for
unpartitioned tables (#285)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 33a3882 Fix data files table for unpartitioned tables (#285)
33a3882 is described below
commit 33a38826f6c104e52981df15d49f1ee420e04c49
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Tue Jul 16 09:44:16 2019 -0700
Fix data files table for unpartitioned tables (#285)
---
.../main/java/org/apache/iceberg/Filterable.java | 34 +++++++++++
.../java/org/apache/iceberg/DataFilesTable.java | 36 +++++++++---
.../java/org/apache/iceberg/DataTableScan.java | 8 +--
.../java/org/apache/iceberg/FilteredManifest.java | 63 ++++++++++++++-------
.../org/apache/iceberg/ManifestEntriesTable.java | 16 ++++--
.../java/org/apache/iceberg/ManifestEntry.java | 2 +-
.../java/org/apache/iceberg/ManifestReader.java | 66 ++++++++--------------
.../org/apache/iceberg/spark/source/Reader.java | 7 ++-
.../spark/source/TestIcebergSourceHiveTables.java | 54 ++++++++++++++++++
9 files changed, 201 insertions(+), 85 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/Filterable.java b/api/src/main/java/org/apache/iceberg/Filterable.java
index 7be5cf1..d53a318 100644
--- a/api/src/main/java/org/apache/iceberg/Filterable.java
+++ b/api/src/main/java/org/apache/iceberg/Filterable.java
@@ -61,6 +61,14 @@ public interface Filterable<T extends Filterable<T>> extends CloseableIterable<D
T select(Collection<String> columns);
/**
+ * Set the projection from a schema.
+ *
+ * @param fileProjection a projection of the DataFile schema
+ * @return a Filterable that will load only the given schema's columns
+ */
+ T project(Schema fileProjection);
+
+ /**
* Adds a filter expression on partition data for matching files.
* <p>
* If the Filterable object already has partition filters, the new filter will be added as an
@@ -90,4 +98,30 @@ public interface Filterable<T extends Filterable<T>> extends CloseableIterable<D
* @return a Filterable that will load only rows that match expr
*/
T filterRows(Expression expr);
+
+ /**
+ * Sets case sensitivity.
+ *
+ * @param isCaseSensitive true if expression binding and schema projection should be case sensitive
+ * @return a Filterable that will use the specified case sensitivity
+ */
+ T caseSensitive(boolean isCaseSensitive);
+
+ /**
+ * Sets case sensitive binding and projection.
+ *
+ * @return a Filterable that will case sensitive binding and projection
+ */
+ default T caseSensitive() {
+ return caseSensitive(true);
+ }
+
+ /**
+ * Sets case insensitive binding and projection.
+ *
+ * @return a Filterable that will case insensitive binding and projection
+ */
+ default T caseInsensitive() {
+ return caseSensitive(false);
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
index 63ca855..6958992 100644
--- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java
+++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java
@@ -20,12 +20,14 @@
package org.apache.iceberg;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
import java.util.Collection;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.types.TypeUtil;
/**
* A {@link Table} implementation that exposes a table's data files as rows.
@@ -51,12 +53,18 @@ class DataFilesTable extends BaseMetadataTable {
@Override
public TableScan newScan() {
- return new FilesTableScan(ops, table);
+ return new FilesTableScan(ops, table, schema());
}
@Override
public Schema schema() {
- return new Schema(DataFile.getType(table.spec().partitionType()).fields());
+ Schema schema = new Schema(DataFile.getType(table.spec().partitionType()).fields());
+ if (table.spec().fields().size() < 1) {
+ // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
+ return TypeUtil.selectNot(schema, Sets.newHashSet(102));
+ } else {
+ return schema;
+ }
}
@Override
@@ -66,15 +74,18 @@ class DataFilesTable extends BaseMetadataTable {
public static class FilesTableScan extends BaseTableScan {
private static final long TARGET_SPLIT_SIZE = 32 * 1024 * 1024; // 32 MB
+ private final Schema fileSchema;
- FilesTableScan(TableOperations ops, Table table) {
- super(ops, table, ManifestEntry.getSchema(table.spec().partitionType()));
+ FilesTableScan(TableOperations ops, Table table, Schema fileSchema) {
+ super(ops, table, fileSchema);
+ this.fileSchema = fileSchema;
}
private FilesTableScan(
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
- boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
+ boolean caseSensitive, boolean colStats, Collection<String> selectedColumns, Schema fileSchema) {
super(ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ this.fileSchema = fileSchema;
}
@Override
@@ -82,7 +93,7 @@ class DataFilesTable extends BaseMetadataTable {
TableOperations ops, Table table, Long snapshotId, Schema schema, Expression rowFilter,
boolean caseSensitive, boolean colStats, Collection<String> selectedColumns) {
return new FilesTableScan(
- ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns);
+ ops, table, snapshotId, schema, rowFilter, caseSensitive, colStats, selectedColumns, fileSchema);
}
@Override
@@ -106,25 +117,32 @@ class DataFilesTable extends BaseMetadataTable {
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
+ // Data tasks produce the table schema, not the projection schema and projection is done by processing engines.
+ // This data task needs to use the table schema, which may not include a partition schema to avoid having an
+ // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in
+ // all cases.
return CloseableIterable.transform(manifests, manifest ->
new ManifestReadTask(ops.io(), new BaseFileScanTask(
- DataFiles.fromManifest(manifest), schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter))));
+ DataFiles.fromManifest(manifest), schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter)),
+ fileSchema));
}
}
private static class ManifestReadTask implements DataTask {
private final FileIO io;
private final FileScanTask manifestTask;
+ private final Schema schema;
- private ManifestReadTask(FileIO io, FileScanTask manifestTask) {
+ private ManifestReadTask(FileIO io, FileScanTask manifestTask, Schema schema) {
this.io = io;
this.manifestTask = manifestTask;
+ this.schema = schema;
}
@Override
public CloseableIterable<StructLike> rows() {
return CloseableIterable.transform(
- ManifestReader.read(io.newInputFile(manifestTask.file().path().toString())),
+ ManifestReader.read(io.newInputFile(manifestTask.file().path().toString())).project(schema),
file -> (GenericDataFile) file);
}
diff --git a/core/src/main/java/org/apache/iceberg/DataTableScan.java b/core/src/main/java/org/apache/iceberg/DataTableScan.java
index 8ae9b31..4e9e82f 100644
--- a/core/src/main/java/org/apache/iceberg/DataTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/DataTableScan.java
@@ -81,15 +81,15 @@ public class DataTableScan extends BaseTableScan {
Iterable<CloseableIterable<FileScanTask>> readers = Iterables.transform(
matchingManifests,
manifest -> {
- ManifestReader reader = ManifestReader
- .read(ops.io().newInputFile(manifest.path()), ops.current()::spec)
- .caseSensitive(caseSensitive);
+ ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()), ops.current()::spec);
PartitionSpec spec = ops.current().spec(manifest.partitionSpecId());
String schemaString = SchemaParser.toJson(spec.schema());
String specString = PartitionSpecParser.toJson(spec);
ResidualEvaluator residuals = ResidualEvaluator.of(spec, rowFilter, caseSensitive);
return CloseableIterable.transform(
- reader.filterRows(rowFilter).select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS),
+ reader.filterRows(rowFilter)
+ .caseSensitive(caseSensitive)
+ .select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS),
file -> new BaseFileScanTask(file, schemaString, specString, residuals)
);
});
diff --git a/core/src/main/java/org/apache/iceberg/FilteredManifest.java b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
index 73bb86f..31cc11b 100644
--- a/core/src/main/java/org/apache/iceberg/FilteredManifest.java
+++ b/core/src/main/java/org/apache/iceberg/FilteredManifest.java
@@ -43,6 +43,7 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
private final ManifestReader reader;
private final Expression partFilter;
private final Expression rowFilter;
+ private final Schema fileSchema;
private final Collection<String> columns;
private final boolean caseSensitive;
@@ -51,37 +52,41 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
private InclusiveMetricsEvaluator lazyMetricsEvaluator = null;
FilteredManifest(ManifestReader reader, Expression partFilter, Expression rowFilter,
- Collection<String> columns, boolean caseSensitive) {
+ Schema fileSchema, Collection<String> columns, boolean caseSensitive) {
Preconditions.checkNotNull(reader, "ManifestReader cannot be null");
this.reader = reader;
this.partFilter = partFilter;
this.rowFilter = rowFilter;
+ this.fileSchema = fileSchema;
this.columns = columns;
this.caseSensitive = caseSensitive;
}
@Override
public FilteredManifest select(Collection<String> selectedColumns) {
- return new FilteredManifest(reader, partFilter, rowFilter, selectedColumns, caseSensitive);
+ return new FilteredManifest(reader, partFilter, rowFilter, fileSchema, selectedColumns, caseSensitive);
+ }
+
+ @Override
+ public FilteredManifest project(Schema fileProjection) {
+ return new FilteredManifest(reader, partFilter, rowFilter, fileProjection, columns, caseSensitive);
}
@Override
public FilteredManifest filterPartitions(Expression expr) {
- return new FilteredManifest(reader,
- Expressions.and(partFilter, expr),
- rowFilter,
- columns,
- caseSensitive);
+ return new FilteredManifest(
+ reader, Expressions.and(partFilter, expr), rowFilter, fileSchema, columns, caseSensitive);
}
@Override
public FilteredManifest filterRows(Expression expr) {
- Expression projected = Projections.inclusive(reader.spec(), caseSensitive).project(expr);
- return new FilteredManifest(reader,
- Expressions.and(partFilter, projected),
- Expressions.and(rowFilter, expr),
- columns,
- caseSensitive);
+ return new FilteredManifest(
+ reader, partFilter, Expressions.and(rowFilter, expr), fileSchema, columns, caseSensitive);
+ }
+
+ @Override
+ public FilteredManifest caseSensitive(boolean isCaseSensitive) {
+ return new FilteredManifest(reader, partFilter, rowFilter, fileSchema, columns, isCaseSensitive);
}
CloseableIterable<ManifestEntry> allEntries() {
@@ -90,13 +95,13 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
- return CloseableIterable.filter(reader.entries(columns),
+ return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
entry -> entry != null &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));
} else {
- return reader.entries(columns);
+ return reader.entries(projection(fileSchema, columns, caseSensitive));
}
}
@@ -106,14 +111,14 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
- return CloseableIterable.filter(reader.entries(columns),
+ return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
entry -> entry != null &&
entry.status() != Status.DELETED &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));
} else {
- return CloseableIterable.filter(reader.entries(columns),
+ return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
entry -> entry != null && entry.status() != Status.DELETED);
}
}
@@ -133,14 +138,16 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
boolean dropStats = Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
return Iterators.transform(
- Iterators.filter(reader.iterator(partFilter, projectColumns),
+ Iterators.filter(reader.iterator(partFilter, projection(fileSchema, projectColumns, caseSensitive)),
input -> input != null &&
evaluator.eval(input.partition()) &&
metricsEvaluator.eval(input)),
dropStats ? DataFile::copyWithoutStats : DataFile::copy);
} else {
- return Iterators.transform(reader.iterator(partFilter, columns), DataFile::copy);
+ return Iterators.transform(
+ reader.iterator(partFilter, projection(fileSchema, columns, caseSensitive)),
+ DataFile::copy);
}
}
@@ -149,10 +156,24 @@ public class FilteredManifest implements Filterable<FilteredManifest> {
reader.close();
}
+ private static Schema projection(Schema fileSchema, Collection<String> columns, boolean caseSensitive) {
+ if (columns != null) {
+ if (caseSensitive) {
+ return fileSchema.select(columns);
+ } else {
+ return fileSchema.caseInsensitiveSelect(columns);
+ }
+ }
+
+ return fileSchema;
+ }
+
private Evaluator evaluator() {
if (lazyEvaluator == null) {
- if (partFilter != null) {
- this.lazyEvaluator = new Evaluator(reader.spec().partitionType(), partFilter, caseSensitive);
+ Expression projected = Projections.inclusive(reader.spec(), caseSensitive).project(rowFilter);
+ Expression finalPartFilter = Expressions.and(projected, partFilter);
+ if (finalPartFilter != null) {
+ this.lazyEvaluator = new Evaluator(reader.spec().partitionType(), finalPartFilter, caseSensitive);
} else {
this.lazyEvaluator = new Evaluator(reader.spec().partitionType(), Expressions.alwaysTrue(), caseSensitive);
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index 848b0c3..f3fe85e 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -19,11 +19,13 @@
package org.apache.iceberg;
+import com.google.common.collect.Sets;
import java.util.Collection;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.types.TypeUtil;
/**
* A {@link Table} implementation that exposes a table's manifest entries as rows.
@@ -52,12 +54,18 @@ class ManifestEntriesTable extends BaseMetadataTable {
@Override
public TableScan newScan() {
- return new EntriesTableScan(ops, table);
+ return new EntriesTableScan(ops, table, schema());
}
@Override
public Schema schema() {
- return ManifestEntry.getSchema(table.spec().partitionType());
+ Schema schema = ManifestEntry.getSchema(table.spec().partitionType());
+ if (table.spec().fields().size() < 1) {
+ // avoid returning an empty struct, which is not always supported. instead, drop the partition field (id 102)
+ return TypeUtil.selectNot(schema, Sets.newHashSet(102));
+ } else {
+ return schema;
+ }
}
@Override
@@ -68,8 +76,8 @@ class ManifestEntriesTable extends BaseMetadataTable {
private static class EntriesTableScan extends BaseTableScan {
private static final long TARGET_SPLIT_SIZE = 32 * 1024 * 1024; // 32 MB
- EntriesTableScan(TableOperations ops, Table table) {
- super(ops, table, ManifestEntry.getSchema(table.spec().partitionType()));
+ EntriesTableScan(TableOperations ops, Table table, Schema schema) {
+ super(ops, table, schema);
}
private EntriesTableScan(
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
index 584f007..c296c97 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
@@ -166,7 +166,7 @@ class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable {
new Schema(DataFile.getType(partitionType).fields()).select(columns).asStruct());
}
- private static Schema wrapFileSchema(StructType fileStruct) {
+ static Schema wrapFileSchema(StructType fileStruct) {
// ids for top-level columns are assigned from 1000
return new Schema(
required(0, "status", IntegerType.get()),
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 781352e..dceac87 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -32,7 +32,6 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.AvroIterable;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Projections;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.InputFile;
@@ -73,8 +72,7 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
private final InputFile file;
private final Map<String, String> metadata;
private final PartitionSpec spec;
- private final Schema schema;
- private final boolean caseSensitive;
+ private final Schema fileSchema;
// lazily initialized
private List<ManifestEntry> cachedAdds = null;
@@ -101,34 +99,12 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
if (specLookup != null) {
this.spec = specLookup.apply(specId);
- this.schema = spec.schema();
} else {
- this.schema = SchemaParser.fromJson(metadata.get("schema"));
+ Schema schema = SchemaParser.fromJson(metadata.get("schema"));
this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
}
- this.caseSensitive = true;
- }
-
- private ManifestReader(InputFile file, Map<String, String> metadata,
- PartitionSpec spec, Schema schema, boolean caseSensitive) {
- this.file = file;
- this.metadata = metadata;
- this.spec = spec;
- this.schema = schema;
- this.caseSensitive = caseSensitive;
- }
-
- /**
- * Returns a new {@link ManifestReader} that, if filtered via {@link #select(java.util.Collection)},
- * {@link #filterPartitions(Expression)} or {@link #filterRows(Expression)}, will apply the specified
- * case sensitivity for column name matching.
- *
- * @param readCaseSensitive whether column name matching should have case sensitivity
- * @return a manifest reader with case sensitivity as stated
- */
- public ManifestReader caseSensitive(boolean readCaseSensitive) {
- return new ManifestReader(file, metadata, spec, schema, readCaseSensitive);
+ this.fileSchema = new Schema(DataFile.getType(spec.partitionType()).fields());
}
public InputFile file() {
@@ -136,7 +112,7 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
}
public Schema schema() {
- return schema;
+ return fileSchema;
}
public PartitionSpec spec() {
@@ -145,21 +121,26 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
@Override
public FilteredManifest select(Collection<String> columns) {
- return new FilteredManifest(this, alwaysTrue(), alwaysTrue(), Lists.newArrayList(columns), caseSensitive);
+ return new FilteredManifest(this, alwaysTrue(), alwaysTrue(), fileSchema, columns, true);
+ }
+
+ @Override
+ public FilteredManifest project(Schema fileProjection) {
+ return new FilteredManifest(this, alwaysTrue(), alwaysTrue(), fileProjection, ALL_COLUMNS, true);
}
@Override
public FilteredManifest filterPartitions(Expression expr) {
- return new FilteredManifest(this, expr, alwaysTrue(), ALL_COLUMNS, caseSensitive);
+ return new FilteredManifest(this, expr, alwaysTrue(), fileSchema, ALL_COLUMNS, true);
}
@Override
public FilteredManifest filterRows(Expression expr) {
- return new FilteredManifest(this,
- Projections.inclusive(spec, caseSensitive).project(expr),
- expr,
- ALL_COLUMNS,
- caseSensitive);
+ return new FilteredManifest(this, alwaysTrue(), expr, fileSchema, ALL_COLUMNS, true);
+ }
+
+ public FilteredManifest caseSensitive(boolean caseSensitive) {
+ return new FilteredManifest(this, alwaysTrue(), alwaysTrue(), fileSchema, ALL_COLUMNS, caseSensitive);
}
public List<ManifestEntry> addedFiles() {
@@ -180,7 +161,7 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
List<ManifestEntry> adds = Lists.newArrayList();
List<ManifestEntry> deletes = Lists.newArrayList();
- for (ManifestEntry entry : entries(CHANGE_COLUNNS)) {
+ for (ManifestEntry entry : entries(fileSchema.select(CHANGE_COLUNNS))) {
switch (entry.status()) {
case ADDED:
adds.add(entry.copyWithoutStats());
@@ -197,18 +178,17 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
}
CloseableIterable<ManifestEntry> entries() {
- return entries(ALL_COLUMNS);
+ return entries(fileSchema);
}
- CloseableIterable<ManifestEntry> entries(Collection<String> columns) {
+ CloseableIterable<ManifestEntry> entries(Schema fileProjection) {
FileFormat format = FileFormat.fromFileName(file.location());
Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file);
- Schema projectedSchema = ManifestEntry.projectSchema(spec.partitionType(), columns);
switch (format) {
case AVRO:
AvroIterable<ManifestEntry> reader = Avro.read(file)
- .project(projectedSchema)
+ .project(ManifestEntry.wrapFileSchema(fileProjection.asStruct()))
.rename("manifest_entry", ManifestEntry.class.getName())
.rename("partition", PartitionData.class.getName())
.rename("r102", PartitionData.class.getName())
@@ -228,13 +208,13 @@ public class ManifestReader extends CloseableGroup implements Filterable<Filtere
@Override
public Iterator<DataFile> iterator() {
- return iterator(alwaysTrue(), ALL_COLUMNS);
+ return iterator(alwaysTrue(), fileSchema);
}
// visible for use by PartialManifest
- Iterator<DataFile> iterator(Expression partFilter, Collection<String> columns) {
+ Iterator<DataFile> iterator(Expression partFilter, Schema fileProjection) {
return Iterables.transform(Iterables.filter(
- entries(columns),
+ entries(fileProjection),
entry -> entry.status() != ManifestEntry.Status.DELETED),
ManifestEntry::file).iterator();
}
diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
index e0b2851..91359ff 100644
--- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
+++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java
@@ -506,9 +506,10 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD
private CloseableIterable<InternalRow> newDataIterable(DataTask task, Schema readSchema) {
StructInternalRow row = new StructInternalRow(tableSchema.asStruct());
- Iterable<InternalRow> asSparkRows = Iterables.transform(task.asDataTask().rows(), row::setStruct);
- return CloseableIterable.withNoopClose(
- Iterables.transform(asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke));
+ CloseableIterable<InternalRow> asSparkRows = CloseableIterable.transform(
+ task.asDataTask().rows(), row::setStruct);
+ return CloseableIterable.transform(
+ asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke);
}
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
index 5a50731..81e2326 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java
@@ -30,6 +30,7 @@ import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -217,6 +218,59 @@ public class TestIcebergSourceHiveTables {
}
@Test
+ public void testHiveFilesUnpartitionedTable() throws TException, IOException {
+ try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
+ Table table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA);
+ Table entriesTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "entries"));
+ Table filesTable = catalog.loadTable(TableIdentifier.of(DB_NAME, TABLE_NAME, "files"));
+
+ Dataset<Row> df1 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(1, "a")), SimpleRecord.class);
+ Dataset<Row> df2 = spark.createDataFrame(Lists.newArrayList(new SimpleRecord(2, "b")), SimpleRecord.class);
+
+ df1.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(TABLE_IDENTIFIER.toString());
+
+ table.refresh();
+ DataFile toDelete = Iterables.getOnlyElement(table.currentSnapshot().addedFiles());
+
+ // add a second file
+ df2.select("id", "data").write()
+ .format("iceberg")
+ .mode("append")
+ .save(TABLE_IDENTIFIER.toString());
+
+ // delete the first file to test that only live files are listed
+ table.newDelete().deleteFile(toDelete).commit();
+
+ List<Row> actual = spark.read()
+ .format("iceberg")
+ .load(DB_NAME + "." + TABLE_NAME + ".files")
+ .collectAsList();
+
+ List<GenericData.Record> expected = Lists.newArrayList();
+ for (ManifestFile manifest : table.currentSnapshot().manifests()) {
+ InputFile in = table.io().newInputFile(manifest.path());
+ try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
+ for (GenericData.Record record : rows) {
+ if ((Integer) record.get("status") < 2 /* added or existing */) {
+ expected.add((GenericData.Record) record.get("data_file"));
+ }
+ }
+ }
+ }
+
+ Assert.assertEquals("Files table should have one row", 1, expected.size());
+ Assert.assertEquals("Actual results should have one row", 1, actual.size());
+ TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0));
+
+ } finally {
+ metastoreClient.dropTable(DB_NAME, TABLE_NAME);
+ }
+ }
+
+ @Test
public void testHiveHistoryTable() throws TException {
try (HiveCatalog catalog = new HiveCatalog(hiveConf)) {
Table table = catalog.createTable(TABLE_IDENTIFIER, SCHEMA, PartitionSpec.unpartitioned());