You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by dw...@apache.org on 2020/06/09 17:43:41 UTC
[iceberg] branch master updated: Combine ManifestReaders into one
parameterized class. (#1099)
This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 2ac53b3 Combine ManifestReaders into one parameterized class. (#1099)
2ac53b3 is described below
commit 2ac53b346c8a1b4cfac42aa8b5811b120b4cf0f0
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Tue Jun 9 10:43:32 2020 -0700
Combine ManifestReaders into one parameterized class. (#1099)
---
.../org/apache/iceberg/BaseManifestReader.java | 294 ---------------------
.../org/apache/iceberg/BaseMetastoreCatalog.java | 2 +-
.../org/apache/iceberg/BaseRewriteManifests.java | 2 +-
.../org/apache/iceberg/DeleteManifestReader.java | 40 ---
.../java/org/apache/iceberg/ManifestFiles.java | 43 ++-
.../java/org/apache/iceberg/ManifestGroup.java | 4 +-
.../java/org/apache/iceberg/ManifestReader.java | 269 ++++++++++++++++++-
.../apache/iceberg/MergingSnapshotProducer.java | 6 +-
.../java/org/apache/iceberg/RemoveSnapshots.java | 17 +-
.../java/org/apache/iceberg/SnapshotProducer.java | 2 +-
.../org/apache/iceberg/TestManifestReader.java | 6 +-
.../org/apache/iceberg/TestRewriteManifests.java | 14 +-
.../java/org/apache/iceberg/TestTransaction.java | 2 +-
.../apache/iceberg/spark/TestSparkDataFile.java | 2 +-
14 files changed, 313 insertions(+), 390 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/BaseManifestReader.java b/core/src/main/java/org/apache/iceberg/BaseManifestReader.java
deleted file mode 100644
index 7217fc0..0000000
--- a/core/src/main/java/org/apache/iceberg/BaseManifestReader.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.avro.AvroIterable;
-import org.apache.iceberg.exceptions.RuntimeIOException;
-import org.apache.iceberg.expressions.Evaluator;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
-import org.apache.iceberg.expressions.Projections;
-import org.apache.iceberg.io.CloseableGroup;
-import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.Types;
-
-import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
-
-/**
- * Base reader for data and delete manifest files.
- *
- * @param <F> The Java class of files returned by this reader.
- * @param <ThisT> The Java class of this reader, returned by configuration methods.
- */
-abstract class BaseManifestReader<F extends ContentFile<F>, ThisT>
- extends CloseableGroup implements CloseableIterable<F> {
- static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
- private static final Set<String> STATS_COLUMNS = Sets.newHashSet(
- "value_counts", "null_value_counts", "lower_bounds", "upper_bounds");
-
- protected enum FileType {
- DATA_FILES(GenericDataFile.class.getName()),
- DELETE_FILES(GenericDeleteFile.class.getName());
-
- private final String fileClass;
-
- FileType(String fileClass) {
- this.fileClass = fileClass;
- }
-
- private String fileClass() {
- return fileClass;
- }
- }
-
- private final InputFile file;
- private final InheritableMetadata inheritableMetadata;
- private final FileType content;
- private final Map<String, String> metadata;
- private final PartitionSpec spec;
- private final Schema fileSchema;
-
- // updated by configuration methods
- private Expression partFilter = alwaysTrue();
- private Expression rowFilter = alwaysTrue();
- private Schema fileProjection = null;
- private Collection<String> columns = null;
- private boolean caseSensitive = true;
-
- // lazily initialized
- private Evaluator lazyEvaluator = null;
- private InclusiveMetricsEvaluator lazyMetricsEvaluator = null;
-
- protected BaseManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById,
- InheritableMetadata inheritableMetadata, FileType content) {
- this.file = file;
- this.inheritableMetadata = inheritableMetadata;
- this.content = content;
-
- try {
- try (AvroIterable<ManifestEntry<F>> headerReader = Avro.read(file)
- .project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
- .build()) {
- this.metadata = headerReader.getMetadata();
- }
- } catch (IOException e) {
- throw new RuntimeIOException(e);
- }
-
- int specId = TableMetadata.INITIAL_SPEC_ID;
- String specProperty = metadata.get("partition-spec-id");
- if (specProperty != null) {
- specId = Integer.parseInt(specProperty);
- }
-
- if (specsById != null) {
- this.spec = specsById.get(specId);
- } else {
- Schema schema = SchemaParser.fromJson(metadata.get("schema"));
- this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
- }
-
- this.fileSchema = new Schema(DataFile.getType(spec.partitionType()).fields());
- }
-
- protected abstract ThisT self();
-
- public InputFile file() {
- return file;
- }
-
- public Schema schema() {
- return fileSchema;
- }
-
- public PartitionSpec spec() {
- return spec;
- }
-
- public ThisT select(Collection<String> newColumns) {
- Preconditions.checkState(fileProjection == null,
- "Cannot select columns using both select(String...) and project(Schema)");
- this.columns = newColumns;
- return self();
- }
-
- public ThisT project(Schema newFileProjection) {
- Preconditions.checkState(columns == null,
- "Cannot select columns using both select(String...) and project(Schema)");
- this.fileProjection = newFileProjection;
- return self();
- }
-
- public ThisT filterPartitions(Expression expr) {
- this.partFilter = Expressions.and(partFilter, expr);
- return self();
- }
-
- public ThisT filterRows(Expression expr) {
- this.rowFilter = Expressions.and(rowFilter, expr);
- return self();
- }
-
- public ThisT caseSensitive(boolean isCaseSensitive) {
- this.caseSensitive = isCaseSensitive;
- return self();
- }
-
- CloseableIterable<ManifestEntry<F>> entries() {
- if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
- (partFilter != null && partFilter != Expressions.alwaysTrue())) {
- Evaluator evaluator = evaluator();
- InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
-
- // ensure stats columns are present for metrics evaluation
- boolean requireStatsProjection = requireStatsProjection(rowFilter, columns);
- Collection<String> projectColumns = requireStatsProjection ? withStatsColumns(columns) : columns;
-
- return CloseableIterable.filter(
- open(projection(fileSchema, fileProjection, projectColumns, caseSensitive)),
- entry -> entry != null &&
- evaluator.eval(entry.file().partition()) &&
- metricsEvaluator.eval(entry.file()));
- } else {
- return open(projection(fileSchema, fileProjection, columns, caseSensitive));
- }
- }
-
- private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
- FileFormat format = FileFormat.fromFileName(file.location());
- Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file);
-
- switch (format) {
- case AVRO:
- AvroIterable<ManifestEntry<F>> reader = Avro.read(file)
- .project(ManifestEntry.wrapFileSchema(projection.asStruct()))
- .rename("manifest_entry", GenericManifestEntry.class.getName())
- .rename("partition", PartitionData.class.getName())
- .rename("r102", PartitionData.class.getName())
- .rename("data_file", content.fileClass())
- .rename("r2", content.fileClass())
- .classLoader(GenericManifestEntry.class.getClassLoader())
- .reuseContainers()
- .build();
-
- addCloseable(reader);
-
- return CloseableIterable.transform(reader, inheritableMetadata::apply);
-
- default:
- throw new UnsupportedOperationException("Invalid format for manifest file: " + format);
- }
- }
-
- CloseableIterable<ManifestEntry<F>> liveEntries() {
- return CloseableIterable.filter(entries(),
- entry -> entry != null && entry.status() != ManifestEntry.Status.DELETED);
- }
-
- /**
- * @return an Iterator of DataFile. Makes defensive copies of files before returning
- */
- @Override
- public CloseableIterator<F> iterator() {
- if (dropStats(rowFilter, columns)) {
- return CloseableIterable.transform(liveEntries(), e -> e.file().copyWithoutStats()).iterator();
- } else {
- return CloseableIterable.transform(liveEntries(), e -> e.file().copy()).iterator();
- }
- }
-
- private static Schema projection(Schema schema, Schema project, Collection<String> columns, boolean caseSensitive) {
- if (columns != null) {
- if (caseSensitive) {
- return schema.select(columns);
- } else {
- return schema.caseInsensitiveSelect(columns);
- }
- } else if (project != null) {
- return project;
- }
-
- return schema;
- }
-
- private Evaluator evaluator() {
- if (lazyEvaluator == null) {
- Expression projected = Projections.inclusive(spec, caseSensitive).project(rowFilter);
- Expression finalPartFilter = Expressions.and(projected, partFilter);
- if (finalPartFilter != null) {
- this.lazyEvaluator = new Evaluator(spec.partitionType(), finalPartFilter, caseSensitive);
- } else {
- this.lazyEvaluator = new Evaluator(spec.partitionType(), Expressions.alwaysTrue(), caseSensitive);
- }
- }
- return lazyEvaluator;
- }
-
- private InclusiveMetricsEvaluator metricsEvaluator() {
- if (lazyMetricsEvaluator == null) {
- if (rowFilter != null) {
- this.lazyMetricsEvaluator = new InclusiveMetricsEvaluator(
- spec.schema(), rowFilter, caseSensitive);
- } else {
- this.lazyMetricsEvaluator = new InclusiveMetricsEvaluator(
- spec.schema(), Expressions.alwaysTrue(), caseSensitive);
- }
- }
- return lazyMetricsEvaluator;
- }
-
- private static boolean requireStatsProjection(Expression rowFilter, Collection<String> columns) {
- // Make sure we have all stats columns for metrics evaluator
- return rowFilter != Expressions.alwaysTrue() &&
- !columns.containsAll(BaseManifestReader.ALL_COLUMNS) &&
- !columns.containsAll(STATS_COLUMNS);
- }
-
- static boolean dropStats(Expression rowFilter, Collection<String> columns) {
- // Make sure we only drop all stats if we had projected all stats
- // We do not drop stats even if we had partially added some stats columns
- return rowFilter != Expressions.alwaysTrue() &&
- !columns.containsAll(BaseManifestReader.ALL_COLUMNS) &&
- Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
- }
-
- private static Collection<String> withStatsColumns(Collection<String> columns) {
- if (columns.containsAll(BaseManifestReader.ALL_COLUMNS)) {
- return columns;
- } else {
- List<String> projectColumns = Lists.newArrayList(columns);
- projectColumns.addAll(STATS_COLUMNS); // order doesn't matter
- return projectColumns;
- }
- }
-}
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
index cabf648..4a077f3 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
@@ -262,7 +262,7 @@ public abstract class BaseMetastoreCatalog implements Catalog {
.executeWith(ThreadPools.getWorkerPool())
.onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc))
.run(manifest -> {
- try (ManifestReader reader = ManifestFiles.read(manifest, io)) {
+ try (ManifestReader<?> reader = ManifestFiles.open(manifest, io)) {
for (ManifestEntry<?> entry : reader.entries()) {
// intern the file path because the weak key map uses identity (==) instead of equals
String path = entry.file().path().toString().intern();
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index d49b60a..de0a785 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -235,7 +235,7 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
keptManifests.add(manifest);
} else {
rewrittenManifests.add(manifest);
- try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())
.select(Arrays.asList("*"))) {
reader.liveEntries().forEach(
entry -> appendEntry(entry, clusterByFunc.apply(entry.file()), manifest.partitionSpecId())
diff --git a/core/src/main/java/org/apache/iceberg/DeleteManifestReader.java b/core/src/main/java/org/apache/iceberg/DeleteManifestReader.java
deleted file mode 100644
index dfeee6b..0000000
--- a/core/src/main/java/org/apache/iceberg/DeleteManifestReader.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg;
-
-import java.util.Map;
-import org.apache.iceberg.io.FileIO;
-import org.apache.iceberg.io.InputFile;
-
-/**
- * Reader for manifest files.
- * <p>
- * Create readers using {@link ManifestFiles#readDeleteManifest(ManifestFile, FileIO, Map)}.
- */
-public class DeleteManifestReader extends BaseManifestReader<DeleteFile, DeleteManifestReader> {
- protected DeleteManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById, InheritableMetadata metadata) {
- super(file, specsById, metadata, FileType.DELETE_FILES);
- }
-
- @Override
- protected DeleteManifestReader self() {
- return this;
- }
-}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index 730badf..809d644 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -21,6 +21,7 @@ package org.apache.iceberg;
import java.io.IOException;
import java.util.Map;
+import org.apache.iceberg.ManifestReader.FileType;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
@@ -42,7 +43,7 @@ public class ManifestFiles {
* @param io a FileIO
* @return a manifest reader
*/
- public static ManifestReader read(ManifestFile manifest, FileIO io) {
+ public static ManifestReader<DataFile> read(ManifestFile manifest, FileIO io) {
return read(manifest, io, null);
}
@@ -54,12 +55,12 @@ public class ManifestFiles {
* @param specsById a Map from spec ID to partition spec
* @return a {@link ManifestReader}
*/
- public static ManifestReader read(ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById) {
+ public static ManifestReader<DataFile> read(ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById) {
Preconditions.checkArgument(manifest.content() == ManifestContent.DATA,
"Cannot read a delete manifest with a ManifestReader: %s", manifest);
InputFile file = io.newInputFile(manifest.path());
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
- return new ManifestReader(file, specsById, inheritableMetadata);
+ return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DATA_FILES);
}
/**
@@ -97,20 +98,20 @@ public class ManifestFiles {
}
/**
- * Returns a new {@link DeleteManifestReader} for a {@link ManifestFile}.
+ * Returns a new {@link ManifestReader} for a {@link ManifestFile}.
*
* @param manifest a {@link ManifestFile}
* @param io a {@link FileIO}
* @param specsById a Map from spec ID to partition spec
- * @return a {@link DeleteManifestReader}
+ * @return a {@link ManifestReader}
*/
- public static DeleteManifestReader readDeleteManifest(ManifestFile manifest, FileIO io,
- Map<Integer, PartitionSpec> specsById) {
+ public static ManifestReader<DeleteFile> readDeleteManifest(ManifestFile manifest, FileIO io,
+ Map<Integer, PartitionSpec> specsById) {
Preconditions.checkArgument(manifest.content() == ManifestContent.DELETES,
"Cannot read a data manifest with a DeleteManifestReader: %s", manifest);
InputFile file = io.newInputFile(manifest.path());
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
- return new DeleteManifestReader(file, specsById, inheritableMetadata);
+ return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DELETE_FILES);
}
/**
@@ -133,13 +134,29 @@ public class ManifestFiles {
throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion);
}
+ static ManifestReader<?> open(ManifestFile manifest, FileIO io) {
+ return open(manifest, io, null);
+ }
+
+ static ManifestReader<?> open(ManifestFile manifest, FileIO io,
+ Map<Integer, PartitionSpec> specsById) {
+ switch (manifest.content()) {
+ case DATA:
+ return ManifestFiles.read(manifest, io, specsById);
+ case DELETES:
+ return ManifestFiles.readDeleteManifest(manifest, io, specsById);
+ }
+ throw new UnsupportedOperationException("Cannot read unknown manifest type: " + manifest.content());
+ }
+
static ManifestFile copyAppendManifest(int formatVersion,
InputFile toCopy, Map<Integer, PartitionSpec> specsById,
OutputFile outputFile, long snapshotId,
SnapshotSummary.Builder summaryBuilder) {
// use metadata that will add the current snapshot's ID for the rewrite
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.forCopy(snapshotId);
- try (ManifestReader reader = new ManifestReader(toCopy, specsById, inheritableMetadata)) {
+ try (ManifestReader<DataFile> reader =
+ new ManifestReader<>(toCopy, specsById, inheritableMetadata, FileType.DATA_FILES)) {
return copyManifestInternal(
formatVersion, reader, outputFile, snapshotId, summaryBuilder, ManifestEntry.Status.ADDED);
} catch (IOException e) {
@@ -153,7 +170,8 @@ public class ManifestFiles {
SnapshotSummary.Builder summaryBuilder) {
// for a rewritten manifest all snapshot ids should be set. use empty metadata to throw an exception if it is not
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.empty();
- try (ManifestReader reader = new ManifestReader(toCopy, specsById, inheritableMetadata)) {
+ try (ManifestReader<DataFile> reader =
+ new ManifestReader<>(toCopy, specsById, inheritableMetadata, FileType.DATA_FILES)) {
return copyManifestInternal(
formatVersion, reader, outputFile, snapshotId, summaryBuilder, ManifestEntry.Status.EXISTING);
} catch (IOException e) {
@@ -161,8 +179,9 @@ public class ManifestFiles {
}
}
- private static ManifestFile copyManifestInternal(int formatVersion, ManifestReader reader, OutputFile outputFile,
- long snapshotId, SnapshotSummary.Builder summaryBuilder,
+ private static ManifestFile copyManifestInternal(int formatVersion, ManifestReader<DataFile> reader,
+ OutputFile outputFile, long snapshotId,
+ SnapshotSummary.Builder summaryBuilder,
ManifestEntry.Status allowedEntryStatus) {
ManifestWriter<DataFile> writer = write(formatVersion, reader.spec(), outputFile, snapshotId);
boolean threw = true;
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index 6308236..bc7114b 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -68,7 +68,7 @@ class ManifestGroup {
this.ignoreDeleted = false;
this.ignoreExisting = false;
this.ignoreResiduals = false;
- this.columns = BaseManifestReader.ALL_COLUMNS;
+ this.columns = ManifestReader.ALL_COLUMNS;
this.caseSensitive = true;
this.manifestPredicate = m -> true;
this.manifestEntryPredicate = e -> true;
@@ -146,7 +146,7 @@ class ManifestGroup {
Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : dataFilter;
return ResidualEvaluator.of(spec, filter, caseSensitive);
});
- boolean dropStats = BaseManifestReader.dropStats(dataFilter, columns);
+ boolean dropStats = ManifestReader.dropStats(dataFilter, columns);
Iterable<CloseableIterable<FileScanTask>> tasks = entries((manifest, entries) -> {
int partitionSpecId = manifest.partitionSpecId();
PartitionSpec spec = specsById.get(partitionSpecId);
diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java
index 45c19fc..bd3801c 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java
@@ -19,22 +19,273 @@
package org.apache.iceberg;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
-import org.apache.iceberg.io.FileIO;
+import java.util.Set;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.avro.AvroIterable;
+import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.InclusiveMetricsEvaluator;
+import org.apache.iceberg.expressions.Projections;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.Types;
+
+import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
/**
- * Reader for manifest files.
- * <p>
- * Create readers using {@link ManifestFiles#read(ManifestFile, FileIO, Map)}.
+ * Base reader for data and delete manifest files.
+ *
+ * @param <F> The Java class of files returned by this reader.
*/
-public class ManifestReader extends BaseManifestReader<DataFile, ManifestReader> {
- ManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById,
- InheritableMetadata inheritableMetadata) {
- super(file, specsById, inheritableMetadata, FileType.DATA_FILES);
+public class ManifestReader<F extends ContentFile<F>>
+ extends CloseableGroup implements CloseableIterable<F> {
+ static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
+ private static final Set<String> STATS_COLUMNS = Sets.newHashSet(
+ "value_counts", "null_value_counts", "lower_bounds", "upper_bounds");
+
+ protected enum FileType {
+ DATA_FILES(GenericDataFile.class.getName()),
+ DELETE_FILES(GenericDeleteFile.class.getName());
+
+ private final String fileClass;
+
+ FileType(String fileClass) {
+ this.fileClass = fileClass;
+ }
+
+ private String fileClass() {
+ return fileClass;
+ }
+ }
+
+ private final InputFile file;
+ private final InheritableMetadata inheritableMetadata;
+ private final FileType content;
+ private final Map<String, String> metadata;
+ private final PartitionSpec spec;
+ private final Schema fileSchema;
+
+ // updated by configuration methods
+ private Expression partFilter = alwaysTrue();
+ private Expression rowFilter = alwaysTrue();
+ private Schema fileProjection = null;
+ private Collection<String> columns = null;
+ private boolean caseSensitive = true;
+
+ // lazily initialized
+ private Evaluator lazyEvaluator = null;
+ private InclusiveMetricsEvaluator lazyMetricsEvaluator = null;
+
+ protected ManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById,
+ InheritableMetadata inheritableMetadata, FileType content) {
+ this.file = file;
+ this.inheritableMetadata = inheritableMetadata;
+ this.content = content;
+
+ try {
+ try (AvroIterable<ManifestEntry<F>> headerReader = Avro.read(file)
+ .project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
+ .build()) {
+ this.metadata = headerReader.getMetadata();
+ }
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+
+ int specId = TableMetadata.INITIAL_SPEC_ID;
+ String specProperty = metadata.get("partition-spec-id");
+ if (specProperty != null) {
+ specId = Integer.parseInt(specProperty);
+ }
+
+ if (specsById != null) {
+ this.spec = specsById.get(specId);
+ } else {
+ Schema schema = SchemaParser.fromJson(metadata.get("schema"));
+ this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec"));
+ }
+
+ this.fileSchema = new Schema(DataFile.getType(spec.partitionType()).fields());
+ }
+
+ public InputFile file() {
+ return file;
+ }
+
+ public Schema schema() {
+ return fileSchema;
+ }
+
+ public PartitionSpec spec() {
+ return spec;
+ }
+
+ public ManifestReader<F> select(Collection<String> newColumns) {
+ Preconditions.checkState(fileProjection == null,
+ "Cannot select columns using both select(String...) and project(Schema)");
+ this.columns = newColumns;
+ return this;
+ }
+
+ public ManifestReader<F> project(Schema newFileProjection) {
+ Preconditions.checkState(columns == null,
+ "Cannot select columns using both select(String...) and project(Schema)");
+ this.fileProjection = newFileProjection;
+ return this;
+ }
+
+ public ManifestReader<F> filterPartitions(Expression expr) {
+ this.partFilter = Expressions.and(partFilter, expr);
+ return this;
}
- protected ManifestReader self() {
+ public ManifestReader<F> filterRows(Expression expr) {
+ this.rowFilter = Expressions.and(rowFilter, expr);
return this;
}
+
+ public ManifestReader<F> caseSensitive(boolean isCaseSensitive) {
+ this.caseSensitive = isCaseSensitive;
+ return this;
+ }
+
+ CloseableIterable<ManifestEntry<F>> entries() {
+ if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
+ (partFilter != null && partFilter != Expressions.alwaysTrue())) {
+ Evaluator evaluator = evaluator();
+ InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();
+
+ // ensure stats columns are present for metrics evaluation
+ boolean requireStatsProjection = requireStatsProjection(rowFilter, columns);
+ Collection<String> projectColumns = requireStatsProjection ? withStatsColumns(columns) : columns;
+
+ return CloseableIterable.filter(
+ open(projection(fileSchema, fileProjection, projectColumns, caseSensitive)),
+ entry -> entry != null &&
+ evaluator.eval(entry.file().partition()) &&
+ metricsEvaluator.eval(entry.file()));
+ } else {
+ return open(projection(fileSchema, fileProjection, columns, caseSensitive));
+ }
+ }
+
+ private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
+ FileFormat format = FileFormat.fromFileName(file.location());
+ Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file);
+
+ switch (format) {
+ case AVRO:
+ AvroIterable<ManifestEntry<F>> reader = Avro.read(file)
+ .project(ManifestEntry.wrapFileSchema(projection.asStruct()))
+ .rename("manifest_entry", GenericManifestEntry.class.getName())
+ .rename("partition", PartitionData.class.getName())
+ .rename("r102", PartitionData.class.getName())
+ .rename("data_file", content.fileClass())
+ .rename("r2", content.fileClass())
+ .classLoader(GenericManifestEntry.class.getClassLoader())
+ .reuseContainers()
+ .build();
+
+ addCloseable(reader);
+
+ return CloseableIterable.transform(reader, inheritableMetadata::apply);
+
+ default:
+ throw new UnsupportedOperationException("Invalid format for manifest file: " + format);
+ }
+ }
+
+ CloseableIterable<ManifestEntry<F>> liveEntries() {
+ return CloseableIterable.filter(entries(),
+ entry -> entry != null && entry.status() != ManifestEntry.Status.DELETED);
+ }
+
+ /**
+ * @return an Iterator of DataFile. Makes defensive copies of files before returning
+ */
+ @Override
+ public CloseableIterator<F> iterator() {
+ if (dropStats(rowFilter, columns)) {
+ return CloseableIterable.transform(liveEntries(), e -> e.file().copyWithoutStats()).iterator();
+ } else {
+ return CloseableIterable.transform(liveEntries(), e -> e.file().copy()).iterator();
+ }
+ }
+
+ private static Schema projection(Schema schema, Schema project, Collection<String> columns, boolean caseSensitive) {
+ if (columns != null) {
+ if (caseSensitive) {
+ return schema.select(columns);
+ } else {
+ return schema.caseInsensitiveSelect(columns);
+ }
+ } else if (project != null) {
+ return project;
+ }
+
+ return schema;
+ }
+
+ private Evaluator evaluator() {
+ if (lazyEvaluator == null) {
+ Expression projected = Projections.inclusive(spec, caseSensitive).project(rowFilter);
+ Expression finalPartFilter = Expressions.and(projected, partFilter);
+ if (finalPartFilter != null) {
+ this.lazyEvaluator = new Evaluator(spec.partitionType(), finalPartFilter, caseSensitive);
+ } else {
+ this.lazyEvaluator = new Evaluator(spec.partitionType(), Expressions.alwaysTrue(), caseSensitive);
+ }
+ }
+ return lazyEvaluator;
+ }
+
+ private InclusiveMetricsEvaluator metricsEvaluator() {
+ if (lazyMetricsEvaluator == null) {
+ if (rowFilter != null) {
+ this.lazyMetricsEvaluator = new InclusiveMetricsEvaluator(
+ spec.schema(), rowFilter, caseSensitive);
+ } else {
+ this.lazyMetricsEvaluator = new InclusiveMetricsEvaluator(
+ spec.schema(), Expressions.alwaysTrue(), caseSensitive);
+ }
+ }
+ return lazyMetricsEvaluator;
+ }
+
+ private static boolean requireStatsProjection(Expression rowFilter, Collection<String> columns) {
+ // Make sure we have all stats columns for metrics evaluator
+ return rowFilter != Expressions.alwaysTrue() &&
+ !columns.containsAll(ManifestReader.ALL_COLUMNS) &&
+ !columns.containsAll(STATS_COLUMNS);
+ }
+
+ static boolean dropStats(Expression rowFilter, Collection<String> columns) {
+ // Make sure we only drop all stats if we had projected all stats
+ // We do not drop stats even if we had partially added some stats columns
+ return rowFilter != Expressions.alwaysTrue() &&
+ !columns.containsAll(ManifestReader.ALL_COLUMNS) &&
+ Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
+ }
+
+ private static Collection<String> withStatsColumns(Collection<String> columns) {
+ if (columns.containsAll(ManifestReader.ALL_COLUMNS)) {
+ return columns;
+ } else {
+ List<String> projectColumns = Lists.newArrayList(columns);
+ projectColumns.addAll(STATS_COLUMNS); // order doesn't matter
+ return projectColumns;
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index dad5018..3076df3 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -524,7 +524,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
}
private boolean manifestHasDeletedFiles(
- StrictMetricsEvaluator metricsEvaluator, ManifestReader reader,
+ StrictMetricsEvaluator metricsEvaluator, ManifestReader<DataFile> reader,
CharSequenceWrapper pathWrapper, StructLikeWrapper partitionWrapper) {
Evaluator inclusive = extractInclusiveDeleteExpression(reader);
Evaluator strict = extractStrictDeleteExpression(reader);
@@ -550,7 +550,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
}
private ManifestFile filterManifestWithDeletedFiles(
- StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader reader,
+ StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader<DataFile> reader,
CharSequenceWrapper pathWrapper, StructLikeWrapper partitionWrapper) throws IOException {
Evaluator inclusive = extractInclusiveDeleteExpression(reader);
Evaluator strict = extractStrictDeleteExpression(reader);
@@ -673,7 +673,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
ManifestWriter<DataFile> writer = newManifestWriter(ops.current().spec(specId));
try {
for (ManifestFile manifest : bin) {
- try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
for (ManifestEntry<DataFile> entry : reader.entries()) {
if (entry.status() == Status.DELETED) {
// suppress deletes from previous snapshots. only files deleted by this snapshot
diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
index 5f33bfd..83424d9 100644
--- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
@@ -22,7 +22,6 @@ package org.apache.iceberg;
import java.io.IOException;
import java.util.Date;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
@@ -31,7 +30,6 @@ import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NotFoundException;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -351,7 +349,7 @@ class RemoveSnapshots implements ExpireSnapshots {
.onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc))
.run(manifest -> {
// the manifest has deletes, scan it to find files to delete
- try (BaseManifestReader<?, ?> reader = openManifest(manifest, ops.io(), ops.current().specsById())) {
+ try (ManifestReader<?> reader = ManifestFiles.open(manifest, ops.io(), ops.current().specsById())) {
for (ManifestEntry<?> entry : reader.entries()) {
// if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
if (entry.status() == ManifestEntry.Status.DELETED &&
@@ -371,7 +369,7 @@ class RemoveSnapshots implements ExpireSnapshots {
.onFailure((item, exc) -> LOG.warn("Failed to get added files: this may cause orphaned data files", exc))
.run(manifest -> {
// the manifest has deletes, scan it to find files to delete
- try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
+ try (ManifestReader<?> reader = ManifestFiles.open(manifest, ops.io(), ops.current().specsById())) {
for (ManifestEntry<?> entry : reader.entries()) {
// delete any ADDED file from manifests that were reverted
if (entry.status() == ManifestEntry.Status.ADDED) {
@@ -403,15 +401,4 @@ class RemoveSnapshots implements ExpireSnapshots {
return CloseableIterable.withNoopClose(snapshot.allManifests());
}
}
-
- private static BaseManifestReader<?, ?> openManifest(ManifestFile manifest, FileIO io,
- Map<Integer, PartitionSpec> specsById) {
- switch (manifest.content()) {
- case DATA:
- return ManifestFiles.read(manifest, io, specsById);
- case DELETES:
- return ManifestFiles.readDeleteManifest(manifest, io, specsById);
- }
- throw new UnsupportedOperationException("Cannot read unknown manifest type: " + manifest.content());
- }
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 0daf5f7..77565e7 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -347,7 +347,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
}
private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
- try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId()));
int addedFiles = 0;
long addedRows = 0L;
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index e899fb7..4e91131 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -46,7 +46,7 @@ public class TestManifestReader extends TableTestBase {
@Test
public void testManifestReaderWithEmptyInheritableMetadata() throws IOException {
ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 1000L, FILE_A));
- try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) {
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)) {
ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader.entries());
Assert.assertEquals(Status.EXISTING, entry.status());
Assert.assertEquals(FILE_A.path(), entry.file().path());
@@ -66,7 +66,7 @@ public class TestManifestReader extends TableTestBase {
@Test
public void testManifestReaderWithPartitionMetadata() throws IOException {
ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A));
- try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) {
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)) {
ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader.entries());
Assert.assertEquals(123L, (long) entry.snapshotId());
@@ -87,7 +87,7 @@ public class TestManifestReader extends TableTestBase {
table.ops().commit(table.ops().current(), table.ops().current().updatePartitionSpec(spec));
ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A));
- try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) {
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, FILE_IO)) {
ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader.entries());
Assert.assertEquals(123L, (long) entry.snapshotId());
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
index 74a46de..7a2c5fb 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
@@ -116,7 +116,7 @@ public class TestRewriteManifests extends TableTestBase {
// get the correct file order
List<DataFile> files;
List<Long> ids;
- try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) {
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
if (reader.iterator().next().path().equals(FILE_A.path())) {
files = Arrays.asList(FILE_A, FILE_B);
ids = Arrays.asList(manifestAppendId, fileAppendId);
@@ -190,7 +190,7 @@ public class TestRewriteManifests extends TableTestBase {
// get the file order correct
List<DataFile> files;
List<Long> ids;
- try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) {
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
if (reader.iterator().next().path().equals(FILE_A.path())) {
files = Arrays.asList(FILE_A, FILE_B);
ids = Arrays.asList(appendIdA, appendIdB);
@@ -232,7 +232,7 @@ public class TestRewriteManifests extends TableTestBase {
table.rewriteManifests()
.clusterBy(file -> "file")
.rewriteIf(manifest -> {
- try (ManifestReader reader = ManifestFiles.read(manifest, table.io())) {
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, table.io())) {
return !reader.iterator().next().path().equals(FILE_A.path());
} catch (IOException x) {
throw new RuntimeIOException(x);
@@ -246,7 +246,7 @@ public class TestRewriteManifests extends TableTestBase {
// get the file order correct
List<DataFile> files;
List<Long> ids;
- try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) {
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
if (reader.iterator().next().path().equals(FILE_B.path())) {
files = Arrays.asList(FILE_B, FILE_C);
ids = Arrays.asList(appendIdB, appendIdC);
@@ -316,7 +316,7 @@ public class TestRewriteManifests extends TableTestBase {
table.rewriteManifests()
.clusterBy(file -> "file")
.rewriteIf(manifest -> {
- try (ManifestReader reader = ManifestFiles.read(manifest, table.io())) {
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, table.io())) {
return !reader.iterator().next().path().equals(FILE_A.path());
} catch (IOException x) {
throw new RuntimeIOException(x);
@@ -336,7 +336,7 @@ public class TestRewriteManifests extends TableTestBase {
// get the file order correct
List<DataFile> files;
List<Long> ids;
- try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) {
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
if (reader.iterator().next().path().equals(FILE_A.path())) {
files = Arrays.asList(FILE_A, FILE_B);
ids = Arrays.asList(appendIdA, appendIdB);
@@ -899,7 +899,7 @@ public class TestRewriteManifests extends TableTestBase {
.addManifest(newManifest)
.clusterBy(dataFile -> "const-value")
.rewriteIf(manifest -> {
- try (ManifestReader reader = ManifestFiles.read(manifest, table.io())) {
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifest, table.io())) {
return !reader.iterator().next().path().equals(FILE_B.path());
} catch (IOException x) {
throw new RuntimeIOException(x);
diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java
index 2182608..577480f 100644
--- a/core/src/test/java/org/apache/iceberg/TestTransaction.java
+++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java
@@ -502,7 +502,7 @@ public class TestTransaction extends TableTestBase {
// create a manifest append
OutputFile manifestLocation = Files.localOutput("/tmp/" + UUID.randomUUID().toString() + ".avro");
- ManifestWriter writer = ManifestFiles.write(table.spec(), manifestLocation);
+ ManifestWriter<DataFile> writer = ManifestFiles.write(table.spec(), manifestLocation);
try {
writer.add(FILE_D);
} finally {
diff --git a/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java b/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java
index 47c9caf..d0967b2 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/TestSparkDataFile.java
@@ -157,7 +157,7 @@ public class TestSparkDataFile {
Assert.assertEquals("Should have 1 manifest", 1, manifests.size());
List<DataFile> dataFiles = Lists.newArrayList();
- try (ManifestReader reader = ManifestFiles.read(manifests.get(0), table.io())) {
+ try (ManifestReader<DataFile> reader = ManifestFiles.read(manifests.get(0), table.io())) {
reader.forEach(dataFile -> dataFiles.add(dataFile.copy()));
}