You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2018/12/06 01:20:10 UTC
[incubator-iceberg] branch master updated: Add manifest listing
files (#21)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 54f9a0f Add manifest listing files (#21)
54f9a0f is described below
commit 54f9a0ffaa0cc69a25818fcdfbc9b8bfc579fe67
Author: Ryan Blue <rd...@users.noreply.github.com>
AuthorDate: Wed Dec 5 17:20:06 2018 -0800
Add manifest listing files (#21)
* Add ManifestFile and migrate Snapshot to return it.
* Optionally write manifest lists to separate files.
This adds a new table property, write.manifest-lists.enabled, that
defaults to false. When enabled, new snapshot manifest lists will be
written into separate files. The file location will be stored in the
snapshot metadata as "manifest-list".
* Aggregate partition field summaries when writing manifests.
* Add InclusiveManifestEvaluator.
This expression evaluator determines whether a manifest needs to be
scanned or whether it cannot contain data files matching a partition
predicate.
* Add file length to ManifestFile.
* Ensure files in manifest lists have helpful metadata.
This modifies SnapshotUpdate when writing a snapshot with a manifest
list file. If files for the manifest list do not have full metadata,
then this will scan the manifests to add metadata, including snapshot
ID, added/existing/deleted count, and partition field summaries.
* Add partitions name mapping when reading Snapshot manifest list.
* Update ScanSummary and FileHistory to use ManifestFile metadata.
This optimizes ScanSummary and FileHistory to ignore manifests that
cannot have changes in the configured time range.
---
api/src/main/java/com/netflix/iceberg/Files.java | 5 +
.../java/com/netflix/iceberg/ManifestFile.java | 144 +++++++++
.../main/java/com/netflix/iceberg/Snapshot.java | 9 +-
.../iceberg/expressions/BoundReference.java | 4 +
.../expressions/InclusiveManifestEvaluator.java | 240 ++++++++++++++
.../com/netflix/iceberg/expressions/Literals.java | 1 +
.../java/com/netflix/iceberg/io/OutputFile.java | 6 +
.../com/netflix/iceberg/types/Comparators.java | 31 ++
.../test/java/com/netflix/iceberg/TestHelpers.java | 101 ++++++
.../com/netflix/iceberg/TestPartitionPaths.java | 1 +
.../TestInclusiveManifestEvaluator.java | 295 +++++++++++++++++
.../java/com/netflix/iceberg/BaseSnapshot.java | 57 +++-
.../java/com/netflix/iceberg/BaseTableScan.java | 22 +-
.../main/java/com/netflix/iceberg/FastAppend.java | 45 +--
.../main/java/com/netflix/iceberg/FileHistory.java | 14 +-
.../com/netflix/iceberg/GenericManifestFile.java | 311 ++++++++++++++++++
.../iceberg/GenericPartitionFieldSummary.java | 191 +++++++++++
.../java/com/netflix/iceberg/ManifestGroup.java | 20 +-
.../com/netflix/iceberg/ManifestListWriter.java | 77 +++++
.../java/com/netflix/iceberg/ManifestWriter.java | 49 ++-
.../com/netflix/iceberg/MergingSnapshotUpdate.java | 356 +++++++++------------
.../java/com/netflix/iceberg/OverwriteData.java | 2 +-
.../java/com/netflix/iceberg/PartitionSummary.java | 97 ++++++
.../java/com/netflix/iceberg/RemoveSnapshots.java | 12 +-
.../iceberg/ReplacePartitionsOperation.java | 2 +-
.../main/java/com/netflix/iceberg/ScanSummary.java | 51 ++-
.../java/com/netflix/iceberg/SnapshotParser.java | 39 ++-
.../java/com/netflix/iceberg/SnapshotUpdate.java | 125 +++++++-
.../java/com/netflix/iceberg/TableMetadata.java | 8 +
.../java/com/netflix/iceberg/TableProperties.java | 3 +
.../main/java/com/netflix/iceberg/avro/Avro.java | 6 +-
.../netflix/iceberg/hadoop/HadoopOutputFile.java | 6 +
.../com/netflix/iceberg/LocalTableOperations.java | 80 +++++
.../java/com/netflix/iceberg/TableTestBase.java | 31 +-
.../java/com/netflix/iceberg/TestFastAppend.java | 22 +-
.../java/com/netflix/iceberg/TestMergeAppend.java | 38 +--
.../java/com/netflix/iceberg/TestReplaceFiles.java | 20 +-
.../java/com/netflix/iceberg/TestSnapshotJson.java | 57 +++-
.../com/netflix/iceberg/TestTableMetadataJson.java | 38 ++-
.../java/com/netflix/iceberg/TestTransaction.java | 24 +-
40 files changed, 2274 insertions(+), 366 deletions(-)
diff --git a/api/src/main/java/com/netflix/iceberg/Files.java b/api/src/main/java/com/netflix/iceberg/Files.java
index b227199..f739751 100644
--- a/api/src/main/java/com/netflix/iceberg/Files.java
+++ b/api/src/main/java/com/netflix/iceberg/Files.java
@@ -76,6 +76,11 @@ public class Files {
}
@Override
+ public InputFile toInputFile() {
+ return localInput(file);
+ }
+
+ @Override
public String toString() {
return location();
}
diff --git a/api/src/main/java/com/netflix/iceberg/ManifestFile.java b/api/src/main/java/com/netflix/iceberg/ManifestFile.java
new file mode 100644
index 0000000..b1d919b
--- /dev/null
+++ b/api/src/main/java/com/netflix/iceberg/ManifestFile.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.netflix.iceberg.types.Types;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+/**
+ * Represents a manifest file that can be scanned to find data files in a table.
+ */
+public interface ManifestFile {
+ Schema SCHEMA = new Schema(
+ required(500, "manifest_path", Types.StringType.get()),
+ required(501, "manifest_length", Types.LongType.get()),
+ required(502, "partition_spec_id", Types.IntegerType.get()),
+ optional(503, "added_snapshot_id", Types.LongType.get()),
+ optional(504, "added_data_files_count", Types.IntegerType.get()),
+ optional(505, "existing_data_files_count", Types.IntegerType.get()),
+ optional(506, "deleted_data_files_count", Types.IntegerType.get()),
+ optional(507, "partitions", Types.ListType.ofRequired(508, Types.StructType.of(
+ required(509, "contains_null", Types.BooleanType.get()),
+ optional(510, "lower_bound", Types.BinaryType.get()), // null if no non-null values
+ optional(511, "upper_bound", Types.BinaryType.get())
+ ))));
+
+ static Schema schema() {
+ return SCHEMA;
+ }
+
+ /**
+ * @return fully qualified path to the file, suitable for constructing a Hadoop Path
+ */
+ String path();
+
+ /**
+ * @return length of the manifest file
+ */
+ long length();
+
+ /**
+ * @return ID of the {@link PartitionSpec} used to write the manifest file
+ */
+ int partitionSpecId();
+
+ /**
+ * @return ID of the snapshot that added the manifest file to table metadata
+ */
+ Long snapshotId();
+
+ /**
+ * @return the number of data files with status ADDED in the manifest file
+ */
+ Integer addedFilesCount();
+
+ /**
+ * @return the number of data files with status EXISTING in the manifest file
+ */
+ Integer existingFilesCount();
+
+ /**
+ * @return the number of data files with status DELETED in the manifest file
+ */
+ Integer deletedFilesCount();
+
+ /**
+ * Returns a list of {@link PartitionFieldSummary partition field summaries}.
+ * <p>
+ * Each summary corresponds to a field in the manifest file's partition spec, by ordinal. For
+ * example, the partition spec [ ts_day=date(ts), type=identity(type) ] will have 2 summaries.
+ * The first summary is for the ts_day partition field and the second is for the type partition
+ * field.
+ *
+ * @return a list of partition field summaries, one for each field in the manifest's spec
+ */
+ List<PartitionFieldSummary> partitions();
+
+ /**
+ * Copies this {@link ManifestFile manifest file}. Readers can reuse manifest file instances; use
+ * this method to make defensive copies.
+ *
+ * @return a copy of this manifest file
+ */
+ ManifestFile copy();
+
+ /**
+ * Summarizes the values of one partition field stored in a manifest file.
+ */
+ interface PartitionFieldSummary {
+ Types.StructType TYPE = ManifestFile.schema()
+ .findType("partitions")
+ .asListType()
+ .elementType()
+ .asStructType();
+
+ static Types.StructType getType() {
+ return TYPE;
+ }
+
+ /**
+ * @return true if at least one data file in the manifest has a null value for the field
+ */
+ boolean containsNull();
+
+ /**
+ * @return a ByteBuffer that contains a serialized bound lower than all values of the field
+ */
+ ByteBuffer lowerBound();
+
+ /**
+ * @return a ByteBuffer that contains a serialized bound higher than all values of the field
+ */
+ ByteBuffer upperBound();
+
+ /**
+ * Copies this {@link PartitionFieldSummary summary}. Readers can reuse instances; use this
+ * method to make defensive copies.
+ *
+ * @return a copy of this partition field summary
+ */
+ PartitionFieldSummary copy();
+ }
+}
diff --git a/api/src/main/java/com/netflix/iceberg/Snapshot.java b/api/src/main/java/com/netflix/iceberg/Snapshot.java
index 7fc878b..89542dc 100644
--- a/api/src/main/java/com/netflix/iceberg/Snapshot.java
+++ b/api/src/main/java/com/netflix/iceberg/Snapshot.java
@@ -60,7 +60,7 @@ public interface Snapshot {
*
* @return a list of fully-qualified manifest locations
*/
- List<String> manifests();
+ List<ManifestFile> manifests();
/**
* Return all files added to the table in this snapshot.
@@ -81,4 +81,11 @@ public interface Snapshot {
* @return all files deleted from the table in this snapshot.
*/
Iterable<DataFile> deletedFiles();
+
+ /**
+ * Return the location of this snapshot's manifest list, or null if it is not separate.
+ *
+ * @return the location of the manifest list for this Snapshot
+ */
+ String manifestListLocation();
}
diff --git a/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java b/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java
index 0106f35..5a83650 100644
--- a/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java
+++ b/api/src/main/java/com/netflix/iceberg/expressions/BoundReference.java
@@ -55,6 +55,10 @@ public class BoundReference<T> implements Reference {
return fieldId;
}
+ public int pos() {
+ return pos;
+ }
+
public T get(StructLike struct) {
return struct.get(pos, javaType());
}
diff --git a/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java b/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java
new file mode 100644
index 0000000..cac617d
--- /dev/null
+++ b/api/src/main/java/com/netflix/iceberg/expressions/InclusiveManifestEvaluator.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.expressions;
+
+import com.netflix.iceberg.ManifestFile;
+import com.netflix.iceberg.ManifestFile.PartitionFieldSummary;
+import com.netflix.iceberg.PartitionSpec;
+import com.netflix.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
+import com.netflix.iceberg.types.Conversions;
+import com.netflix.iceberg.types.Types.StructType;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static com.netflix.iceberg.expressions.Expressions.rewriteNot;
+
+/**
+ * Evaluates an {@link Expression} on a {@link ManifestFile} to test whether the file contains
+ * matching partitions.
+ * <p>
+ * This evaluation is inclusive: it returns true if a file may match and false if it cannot match.
+ * <p>
+ * Files are passed to {@link #eval(ManifestFile)}, which returns true if the manifest may contain
+ * data files that match the partition expression. Manifest files may be skipped if and only if the
+ * return value of {@code eval} is false.
+ */
+public class InclusiveManifestEvaluator {
+ private final StructType struct;
+ private final Expression expr;
+ private transient ThreadLocal<ManifestEvalVisitor> visitors = null;
+
+ private ManifestEvalVisitor visitor() {
+ if (visitors == null) {
+ this.visitors = ThreadLocal.withInitial(ManifestEvalVisitor::new);
+ }
+ return visitors.get();
+ }
+
+ public InclusiveManifestEvaluator(PartitionSpec spec, Expression rowFilter) {
+ this.struct = spec.partitionType();
+ this.expr = Binder.bind(struct, rewriteNot(Projections.inclusive(spec).project(rowFilter)));
+ }
+
+ /**
+ * Test whether the file may contain records that match the expression.
+ *
+ * @param manifest a manifest file
+ * @return false if the file cannot contain rows that match the expression, true otherwise.
+ */
+ public boolean eval(ManifestFile manifest) {
+ return visitor().eval(manifest);
+ }
+
+ private static final boolean ROWS_MIGHT_MATCH = true;
+ private static final boolean ROWS_CANNOT_MATCH = false;
+
+ private class ManifestEvalVisitor extends BoundExpressionVisitor<Boolean> {
+ private List<PartitionFieldSummary> stats = null;
+
+ private boolean eval(ManifestFile manifest) {
+ this.stats = manifest.partitions();
+ if (stats == null) {
+ return ROWS_MIGHT_MATCH;
+ }
+
+ return ExpressionVisitors.visit(expr, this);
+ }
+
+ @Override
+ public Boolean alwaysTrue() {
+ return ROWS_MIGHT_MATCH; // all rows match
+ }
+
+ @Override
+ public Boolean alwaysFalse() {
+ return ROWS_CANNOT_MATCH; // all rows fail
+ }
+
+ @Override
+ public Boolean not(Boolean result) {
+ return !result;
+ }
+
+ @Override
+ public Boolean and(Boolean leftResult, Boolean rightResult) {
+ return leftResult && rightResult;
+ }
+
+ @Override
+ public Boolean or(Boolean leftResult, Boolean rightResult) {
+ return leftResult || rightResult;
+ }
+
+ @Override
+ public <T> Boolean isNull(BoundReference<T> ref) {
+ // no need to check whether the field is required because binding evaluates that case
+ // if the column has no null values, the expression cannot match
+ if (!stats.get(ref.pos()).containsNull()) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean notNull(BoundReference<T> ref) {
+ // containsNull encodes whether at least one partition value is null, lowerBound is null if
+ // all partition values are null.
+ ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+ if (lowerBound == null) {
+ return ROWS_CANNOT_MATCH; // all values are null
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean lt(BoundReference<T> ref, Literal<T> lit) {
+ ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+ if (lowerBound == null) {
+ return ROWS_CANNOT_MATCH; // values are all null
+ }
+
+ T lower = Conversions.fromByteBuffer(ref.type(), lowerBound);
+
+ int cmp = lit.comparator().compare(lower, lit.value());
+ if (cmp >= 0) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean ltEq(BoundReference<T> ref, Literal<T> lit) {
+ ByteBuffer lowerBound = stats.get(ref.pos()).lowerBound();
+ if (lowerBound == null) {
+ return ROWS_CANNOT_MATCH; // values are all null
+ }
+
+ T lower = Conversions.fromByteBuffer(ref.type(), lowerBound);
+
+ int cmp = lit.comparator().compare(lower, lit.value());
+ if (cmp > 0) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean gt(BoundReference<T> ref, Literal<T> lit) {
+ ByteBuffer upperBound = stats.get(ref.pos()).upperBound();
+ if (upperBound == null) {
+ return ROWS_CANNOT_MATCH; // values are all null
+ }
+
+ T upper = Conversions.fromByteBuffer(ref.type(), upperBound);
+
+ int cmp = lit.comparator().compare(upper, lit.value());
+ if (cmp <= 0) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean gtEq(BoundReference<T> ref, Literal<T> lit) {
+ ByteBuffer upperBound = stats.get(ref.pos()).upperBound();
+ if (upperBound == null) {
+ return ROWS_CANNOT_MATCH; // values are all null
+ }
+
+ T upper = Conversions.fromByteBuffer(ref.type(), upperBound);
+
+ int cmp = lit.comparator().compare(upper, lit.value());
+ if (cmp < 0) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean eq(BoundReference<T> ref, Literal<T> lit) {
+ PartitionFieldSummary fieldStats = stats.get(ref.pos());
+ if (fieldStats.lowerBound() == null) {
+ return ROWS_CANNOT_MATCH; // values are all null and literal cannot contain null
+ }
+
+ T lower = Conversions.fromByteBuffer(ref.type(), fieldStats.lowerBound());
+ int cmp = lit.comparator().compare(lower, lit.value());
+ if (cmp > 0) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ T upper = Conversions.fromByteBuffer(ref.type(), fieldStats.upperBound());
+ cmp = lit.comparator().compare(upper, lit.value());
+ if (cmp < 0) {
+ return ROWS_CANNOT_MATCH;
+ }
+
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean notEq(BoundReference<T> ref, Literal<T> lit) {
+ // because the bounds are not necessarily a min or max value, this cannot be answered using
+ // them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col.
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean in(BoundReference<T> ref, Literal<T> lit) {
+ return ROWS_MIGHT_MATCH;
+ }
+
+ @Override
+ public <T> Boolean notIn(BoundReference<T> ref, Literal<T> lit) {
+ return ROWS_MIGHT_MATCH;
+ }
+ }
+}
diff --git a/api/src/main/java/com/netflix/iceberg/expressions/Literals.java b/api/src/main/java/com/netflix/iceberg/expressions/Literals.java
index 22ef41c..f4e5d4e 100644
--- a/api/src/main/java/com/netflix/iceberg/expressions/Literals.java
+++ b/api/src/main/java/com/netflix/iceberg/expressions/Literals.java
@@ -96,6 +96,7 @@ class Literals {
private final T value;
BaseLiteral(T value) {
+ Preconditions.checkNotNull(value, "Literal values cannot be null");
this.value = value;
}
diff --git a/api/src/main/java/com/netflix/iceberg/io/OutputFile.java b/api/src/main/java/com/netflix/iceberg/io/OutputFile.java
index 9d75805..f0f48ee 100644
--- a/api/src/main/java/com/netflix/iceberg/io/OutputFile.java
+++ b/api/src/main/java/com/netflix/iceberg/io/OutputFile.java
@@ -58,4 +58,10 @@ public interface OutputFile {
*/
String location();
+ /**
+ * Return an {@link InputFile} for the location of this output file.
+ *
+ * @return an input file for the location of this output file
+ */
+ InputFile toInputFile();
}
diff --git a/api/src/main/java/com/netflix/iceberg/types/Comparators.java b/api/src/main/java/com/netflix/iceberg/types/Comparators.java
index 9e2ce2d..6680f7d 100644
--- a/api/src/main/java/com/netflix/iceberg/types/Comparators.java
+++ b/api/src/main/java/com/netflix/iceberg/types/Comparators.java
@@ -19,10 +19,41 @@
package com.netflix.iceberg.types;
+import com.google.common.collect.ImmutableMap;
import java.nio.ByteBuffer;
import java.util.Comparator;
public class Comparators {
+ private static final ImmutableMap<Type.PrimitiveType, Comparator<?>> COMPARATORS = ImmutableMap
+ .<Type.PrimitiveType, Comparator<?>>builder()
+ .put(Types.BooleanType.get(), Comparator.naturalOrder())
+ .put(Types.IntegerType.get(), Comparator.naturalOrder())
+ .put(Types.LongType.get(), Comparator.naturalOrder())
+ .put(Types.FloatType.get(), Comparator.naturalOrder())
+ .put(Types.DoubleType.get(), Comparator.naturalOrder())
+ .put(Types.DateType.get(), Comparator.naturalOrder())
+ .put(Types.TimeType.get(), Comparator.naturalOrder())
+ .put(Types.TimestampType.withZone(), Comparator.naturalOrder())
+ .put(Types.TimestampType.withoutZone(), Comparator.naturalOrder())
+ .put(Types.StringType.get(), Comparators.charSequences())
+ .put(Types.UUIDType.get(), Comparator.naturalOrder())
+ .put(Types.BinaryType.get(), Comparators.unsignedBytes())
+ .build();
+
+ @SuppressWarnings("unchecked")
+ public static <T> Comparator<T> forType(Type.PrimitiveType type) {
+ Comparator<?> cmp = COMPARATORS.get(type);
+ if (cmp != null) {
+ return (Comparator<T>) cmp;
+ } else if (type instanceof Types.FixedType) {
+ return (Comparator<T>) Comparators.unsignedBytes();
+ } else if (type instanceof Types.DecimalType) {
+ return (Comparator<T>) Comparator.naturalOrder();
+ }
+
+ throw new UnsupportedOperationException("Cannot determine comparator for type: " + type);
+ }
+
public static Comparator<ByteBuffer> unsignedBytes() {
return UnsignedByteBufComparator.INSTANCE;
}
diff --git a/api/src/test/java/com/netflix/iceberg/TestHelpers.java b/api/src/test/java/com/netflix/iceberg/TestHelpers.java
index 118e11b..ceb1eed 100644
--- a/api/src/test/java/com/netflix/iceberg/TestHelpers.java
+++ b/api/src/test/java/com/netflix/iceberg/TestHelpers.java
@@ -176,6 +176,107 @@ public class TestHelpers {
}
}
+ public static class TestFieldSummary implements ManifestFile.PartitionFieldSummary {
+ private final boolean containsNull;
+ private final ByteBuffer lowerBound;
+ private final ByteBuffer upperBound;
+
+ public TestFieldSummary(boolean containsNull, ByteBuffer lowerBound, ByteBuffer upperBound) {
+ this.containsNull = containsNull;
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ }
+
+ @Override
+ public boolean containsNull() {
+ return containsNull;
+ }
+
+ @Override
+ public ByteBuffer lowerBound() {
+ return lowerBound;
+ }
+
+ @Override
+ public ByteBuffer upperBound() {
+ return upperBound;
+ }
+
+ @Override
+ public ManifestFile.PartitionFieldSummary copy() {
+ return this;
+ }
+ }
+
+ public static class TestManifestFile implements ManifestFile {
+ private final String path;
+ private final long length;
+ private final int specId;
+ private final Long snapshotId;
+ private final Integer addedFiles;
+ private final Integer existingFiles;
+ private final Integer deletedFiles;
+ private final List<PartitionFieldSummary> partitions;
+
+ public TestManifestFile(String path, long length, int specId, Long snapshotId,
+ Integer addedFiles, Integer existingFiles, Integer deletedFiles,
+ List<PartitionFieldSummary> partitions) {
+ this.path = path;
+ this.length = length;
+ this.specId = specId;
+ this.snapshotId = snapshotId;
+ this.addedFiles = addedFiles;
+ this.existingFiles = existingFiles;
+ this.deletedFiles = deletedFiles;
+ this.partitions = partitions;
+ }
+
+ @Override
+ public String path() {
+ return path;
+ }
+
+ @Override
+ public long length() {
+ return length;
+ }
+
+ @Override
+ public int partitionSpecId() {
+ return specId;
+ }
+
+ @Override
+ public Long snapshotId() {
+ return snapshotId;
+ }
+
+ @Override
+ public Integer addedFilesCount() {
+ return addedFiles;
+ }
+
+ @Override
+ public Integer existingFilesCount() {
+ return existingFiles;
+ }
+
+ @Override
+ public Integer deletedFilesCount() {
+ return deletedFiles;
+ }
+
+ @Override
+ public List<PartitionFieldSummary> partitions() {
+ return partitions;
+ }
+
+ @Override
+ public ManifestFile copy() {
+ return this;
+ }
+ }
+
public static class TestDataFile implements DataFile {
private final String path;
private final StructLike partition;
diff --git a/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java b/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java
index aab66a3..1253f1e 100644
--- a/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java
+++ b/api/src/test/java/com/netflix/iceberg/TestPartitionPaths.java
@@ -34,6 +34,7 @@ public class TestPartitionPaths {
);
@Test
+ @SuppressWarnings("unchecked")
public void testPartitionPath() {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA)
.hour("ts")
diff --git a/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java b/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java
new file mode 100644
index 0000000..f92f700
--- /dev/null
+++ b/api/src/test/java/com/netflix/iceberg/expressions/TestInclusiveManifestEvaluator.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg.expressions;
+
+import com.google.common.collect.ImmutableList;
+import com.netflix.iceberg.ManifestFile;
+import com.netflix.iceberg.PartitionSpec;
+import com.netflix.iceberg.Schema;
+import com.netflix.iceberg.TestHelpers;
+import com.netflix.iceberg.exceptions.ValidationException;
+import com.netflix.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static com.netflix.iceberg.expressions.Expressions.and;
+import static com.netflix.iceberg.expressions.Expressions.equal;
+import static com.netflix.iceberg.expressions.Expressions.greaterThan;
+import static com.netflix.iceberg.expressions.Expressions.greaterThanOrEqual;
+import static com.netflix.iceberg.expressions.Expressions.isNull;
+import static com.netflix.iceberg.expressions.Expressions.lessThan;
+import static com.netflix.iceberg.expressions.Expressions.lessThanOrEqual;
+import static com.netflix.iceberg.expressions.Expressions.not;
+import static com.netflix.iceberg.expressions.Expressions.notEqual;
+import static com.netflix.iceberg.expressions.Expressions.notNull;
+import static com.netflix.iceberg.expressions.Expressions.or;
+import static com.netflix.iceberg.types.Conversions.toByteBuffer;
+import static com.netflix.iceberg.types.Types.NestedField.optional;
+import static com.netflix.iceberg.types.Types.NestedField.required;
+
+public class TestInclusiveManifestEvaluator {
+ private static final Schema SCHEMA = new Schema(
+ required(1, "id", Types.IntegerType.get()),
+ optional(4, "all_nulls", Types.StringType.get()),
+ optional(5, "some_nulls", Types.StringType.get()),
+ optional(6, "no_nulls", Types.StringType.get())
+ );
+
+ private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+ .withSpecId(0)
+ .identity("id")
+ .identity("all_nulls")
+ .identity("some_nulls")
+ .identity("no_nulls")
+ .build();
+
+ private static final ByteBuffer INT_MIN = toByteBuffer(Types.IntegerType.get(), 30);
+ private static final ByteBuffer INT_MAX = toByteBuffer(Types.IntegerType.get(), 79);
+
+ private static final ByteBuffer STRING_MIN = toByteBuffer(Types.StringType.get(), "a");
+ private static final ByteBuffer STRING_MAX = toByteBuffer(Types.StringType.get(), "z");
+
+ private static final ManifestFile NO_STATS = new TestHelpers.TestManifestFile(
+ "manifest-list.avro", 1024, 0, System.currentTimeMillis(), null, null, null, null);
+
+ private static final ManifestFile FILE = new TestHelpers.TestManifestFile("manifest-list.avro",
+ 1024, 0, System.currentTimeMillis(), 5, 10, 0, ImmutableList.of(
+ new TestHelpers.TestFieldSummary(false, INT_MIN, INT_MAX),
+ new TestHelpers.TestFieldSummary(true, null, null),
+ new TestHelpers.TestFieldSummary(true, STRING_MIN, STRING_MAX),
+ new TestHelpers.TestFieldSummary(false, STRING_MIN, STRING_MAX)));
+
+ @Test
+ public void testAllNulls() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("all_nulls")).eval(FILE);
+ Assert.assertFalse("Should skip: no non-null value in all null column", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("some_nulls")).eval(FILE);
+ Assert.assertTrue("Should read: column with some nulls contains a non-null value", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notNull("no_nulls")).eval(FILE);
+ Assert.assertTrue("Should read: non-null column contains a non-null value", shouldRead);
+ }
+
+ @Test
+ public void testNoNulls() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("all_nulls")).eval(FILE);
+ Assert.assertTrue("Should read: at least one null value in all null column", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("some_nulls")).eval(FILE);
+ Assert.assertTrue("Should read: column with some nulls contains a null value", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, isNull("no_nulls")).eval(FILE);
+ Assert.assertFalse("Should skip: non-null column contains no null values", shouldRead);
+ }
+
+ @Test
+ public void testMissingColumn() {
+ TestHelpers.assertThrows("Should complain about missing column in expression",
+ ValidationException.class, "Cannot find field 'missing'",
+ () -> new InclusiveManifestEvaluator(SPEC, lessThan("missing", 5)).eval(FILE));
+ }
+
+ @Test
+ public void testMissingStats() {
+ Expression[] exprs = new Expression[] {
+ lessThan("id", 5), lessThanOrEqual("id", 30), equal("id", 70),
+ greaterThan("id", 78), greaterThanOrEqual("id", 90), notEqual("id", 101),
+ isNull("id"), notNull("id")
+ };
+
+ for (Expression expr : exprs) {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, expr).eval(NO_STATS);
+ Assert.assertTrue("Should read when missing stats for expr: " + expr, shouldRead);
+ }
+ }
+
+ @Test
+ public void testNot() {
+ // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(lessThan("id", 5))).eval(FILE);
+ Assert.assertTrue("Should read: not(false)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(greaterThan("id", 5))).eval(FILE);
+ Assert.assertFalse("Should skip: not(true)", shouldRead);
+ }
+
+ @Test
+ public void testAnd() {
+ // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
+ boolean shouldRead = new InclusiveManifestEvaluator(
+ SPEC, and(lessThan("id", 5), greaterThanOrEqual("id", 0))).eval(FILE);
+ Assert.assertFalse("Should skip: and(false, false)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(
+ SPEC, and(greaterThan("id", 5), lessThanOrEqual("id", 30))).eval(FILE);
+ Assert.assertTrue("Should read: and(true, true)", shouldRead);
+ }
+
+ @Test
+ public void testOr() {
+ // this test case must use a real predicate, not alwaysTrue(), or binding will simplify it out
+ boolean shouldRead = new InclusiveManifestEvaluator(
+ SPEC, or(lessThan("id", 5), greaterThanOrEqual("id", 80))).eval(FILE);
+ Assert.assertFalse("Should skip: or(false, false)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(
+ SPEC, or(lessThan("id", 5), greaterThanOrEqual("id", 60))).eval(FILE);
+ Assert.assertTrue("Should read: or(false, true)", shouldRead);
+ }
+
+ @Test
+ public void testIntegerLt() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 5)).eval(FILE);
+ Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 30)).eval(FILE);
+ Assert.assertFalse("Should not read: id range below lower bound (30 is not < 30)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 31)).eval(FILE);
+ Assert.assertTrue("Should read: one possible id", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, lessThan("id", 79)).eval(FILE);
+ Assert.assertTrue("Should read: may possible ids", shouldRead);
+ }
+
+ @Test
+ public void testIntegerLtEq() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 5)).eval(FILE);
+ Assert.assertFalse("Should not read: id range below lower bound (5 < 30)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 29)).eval(FILE);
+ Assert.assertFalse("Should not read: id range below lower bound (29 < 30)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 30)).eval(FILE);
+ Assert.assertTrue("Should read: one possible id", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, lessThanOrEqual("id", 79)).eval(FILE);
+ Assert.assertTrue("Should read: many possible ids", shouldRead);
+ }
+
+ @Test
+ public void testIntegerGt() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 85)).eval(FILE);
+ Assert.assertFalse("Should not read: id range above upper bound (85 < 79)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 79)).eval(FILE);
+ Assert.assertFalse("Should not read: id range above upper bound (79 is not > 79)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 78)).eval(FILE);
+ Assert.assertTrue("Should read: one possible id", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, greaterThan("id", 75)).eval(FILE);
+ Assert.assertTrue("Should read: may possible ids", shouldRead);
+ }
+
+ @Test
+ public void testIntegerGtEq() {
+ boolean shouldRead = new InclusiveManifestEvaluator(
+ SPEC, greaterThanOrEqual("id", 85)).eval(FILE);
+ Assert.assertFalse("Should not read: id range above upper bound (85 < 79)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(
+ SPEC, greaterThanOrEqual("id", 80)).eval(FILE);
+ Assert.assertFalse("Should not read: id range above upper bound (80 > 79)", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(
+ SPEC, greaterThanOrEqual("id", 79)).eval(FILE);
+ Assert.assertTrue("Should read: one possible id", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(
+ SPEC, greaterThanOrEqual("id", 75)).eval(FILE);
+ Assert.assertTrue("Should read: may possible ids", shouldRead);
+ }
+
+ @Test
+ public void testIntegerEq() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 5)).eval(FILE);
+ Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 29)).eval(FILE);
+ Assert.assertFalse("Should not read: id below lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 30)).eval(FILE);
+ Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 75)).eval(FILE);
+ Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 79)).eval(FILE);
+ Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 80)).eval(FILE);
+ Assert.assertFalse("Should not read: id above upper bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, equal("id", 85)).eval(FILE);
+ Assert.assertFalse("Should not read: id above upper bound", shouldRead);
+ }
+
+ @Test
+ public void testIntegerNotEq() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 5)).eval(FILE);
+ Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 29)).eval(FILE);
+ Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 30)).eval(FILE);
+ Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 75)).eval(FILE);
+ Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 79)).eval(FILE);
+ Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 80)).eval(FILE);
+ Assert.assertTrue("Should read: id above upper bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, notEqual("id", 85)).eval(FILE);
+ Assert.assertTrue("Should read: id above upper bound", shouldRead);
+ }
+
+ @Test
+ public void testIntegerNotEqRewritten() {
+ boolean shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 5))).eval(FILE);
+ Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 29))).eval(FILE);
+ Assert.assertTrue("Should read: id below lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 30))).eval(FILE);
+ Assert.assertTrue("Should read: id equal to lower bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 75))).eval(FILE);
+ Assert.assertTrue("Should read: id between lower and upper bounds", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 79))).eval(FILE);
+ Assert.assertTrue("Should read: id equal to upper bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 80))).eval(FILE);
+ Assert.assertTrue("Should read: id above upper bound", shouldRead);
+
+ shouldRead = new InclusiveManifestEvaluator(SPEC, not(equal("id", 85))).eval(FILE);
+ Assert.assertTrue("Should read: id above upper bound", shouldRead);
+ }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
index 5409b9a..945ddbb 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseSnapshot.java
@@ -20,21 +20,25 @@
package com.netflix.iceberg;
import com.google.common.base.Objects;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import com.netflix.iceberg.avro.Avro;
import com.netflix.iceberg.exceptions.RuntimeIOException;
-import com.netflix.iceberg.io.CloseableGroup;
+import com.netflix.iceberg.io.CloseableIterable;
+import com.netflix.iceberg.io.InputFile;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-class BaseSnapshot extends CloseableGroup implements Snapshot {
+class BaseSnapshot implements Snapshot {
private final TableOperations ops;
private final long snapshotId;
private final Long parentId;
private final long timestampMillis;
- private final List<String> manifestFiles;
+ private final InputFile manifestList;
// lazily initialized
+ private List<ManifestFile> manifests = null;
private List<DataFile> adds = null;
private List<DataFile> deletes = null;
@@ -44,19 +48,30 @@ class BaseSnapshot extends CloseableGroup implements Snapshot {
BaseSnapshot(TableOperations ops,
long snapshotId,
String... manifestFiles) {
- this(ops, snapshotId, null, System.currentTimeMillis(), Arrays.asList(manifestFiles));
+ this(ops, snapshotId, null, System.currentTimeMillis(),
+ Lists.transform(Arrays.asList(manifestFiles),
+ path -> new GenericManifestFile(ops.newInputFile(path), 0)));
}
BaseSnapshot(TableOperations ops,
long snapshotId,
Long parentId,
long timestampMillis,
- List<String> manifestFiles) {
+ InputFile manifestList) {
this.ops = ops;
this.snapshotId = snapshotId;
this.parentId = parentId;
this.timestampMillis = timestampMillis;
- this.manifestFiles = manifestFiles;
+ this.manifestList = manifestList;
+ }
+
+ BaseSnapshot(TableOperations ops,
+ long snapshotId,
+ Long parentId,
+ long timestampMillis,
+ List<ManifestFile> manifests) {
+ this(ops, snapshotId, parentId, timestampMillis, (InputFile) null);
+ this.manifests = manifests;
}
@Override
@@ -75,8 +90,25 @@ class BaseSnapshot extends CloseableGroup implements Snapshot {
}
@Override
- public List<String> manifests() {
- return manifestFiles;
+ public List<ManifestFile> manifests() {
+ if (manifests == null) {
+ // if manifests isn't set, then the snapshotFile is set and should be read to get the list
+ try (CloseableIterable<ManifestFile> files = Avro.read(manifestList)
+ .rename("manifest_file", GenericManifestFile.class.getName())
+ .rename("partitions", GenericPartitionFieldSummary.class.getName())
+ .rename("r508", GenericPartitionFieldSummary.class.getName())
+ .project(ManifestFile.schema())
+ .reuseContainers(false)
+ .build()) {
+
+ this.manifests = Lists.newLinkedList(files);
+
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Cannot read snapshot file: %s", manifestList.location());
+ }
+ }
+
+ return manifests;
}
@Override
@@ -95,13 +127,18 @@ class BaseSnapshot extends CloseableGroup implements Snapshot {
return deletes;
}
+ @Override
+ public String manifestListLocation() {
+ return manifestList != null ? manifestList.location() : null;
+ }
+
private void cacheChanges() {
List<DataFile> adds = Lists.newArrayList();
List<DataFile> deletes = Lists.newArrayList();
// accumulate adds and deletes from all manifests.
// because manifests can be reused in newer snapshots, filter the changes by snapshot id.
- for (String manifest : manifestFiles) {
+ for (String manifest : Iterables.transform(manifests, ManifestFile::path)) {
try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) {
for (ManifestEntry add : reader.addedFiles()) {
if (add.snapshotId() == snapshotId) {
@@ -127,7 +164,7 @@ class BaseSnapshot extends CloseableGroup implements Snapshot {
return Objects.toStringHelper(this)
.add("id", snapshotId)
.add("timestamp_ms", timestampMillis)
- .add("manifests", manifestFiles)
+ .add("manifests", manifests)
.toString();
}
}
diff --git a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
index e99889e..ad20780 100644
--- a/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
+++ b/core/src/main/java/com/netflix/iceberg/BaseTableScan.java
@@ -21,6 +21,9 @@ package com.netflix.iceberg;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@@ -30,6 +33,8 @@ import com.netflix.iceberg.events.ScanEvent;
import com.netflix.iceberg.expressions.Binder;
import com.netflix.iceberg.expressions.Expression;
import com.netflix.iceberg.expressions.Expressions;
+import com.netflix.iceberg.expressions.InclusiveManifestEvaluator;
+import com.netflix.iceberg.expressions.Projections;
import com.netflix.iceberg.expressions.ResidualEvaluator;
import com.netflix.iceberg.io.CloseableIterable;
import com.netflix.iceberg.types.TypeUtil;
@@ -141,6 +146,16 @@ class BaseTableScan implements TableScan {
return new BaseTableScan(ops, table, snapshotId, schema, Expressions.and(rowFilter, expr));
}
+ private final LoadingCache<Integer, InclusiveManifestEvaluator> EVAL_CACHE = CacheBuilder
+ .newBuilder()
+ .build(new CacheLoader<Integer, InclusiveManifestEvaluator>() {
+ @Override
+ public InclusiveManifestEvaluator load(Integer specId) {
+ PartitionSpec spec = ops.current().spec(specId);
+ return new InclusiveManifestEvaluator(spec, rowFilter);
+ }
+ });
+
@Override
public CloseableIterable<FileScanTask> planFiles() {
Snapshot snapshot = snapshotId != null ?
@@ -155,11 +170,14 @@ class BaseTableScan implements TableScan {
Listeners.notifyAll(
new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema));
+ Iterable<ManifestFile> matchingManifests = Iterables.filter(snapshot.manifests(),
+ manifest -> EVAL_CACHE.getUnchecked(manifest.partitionSpecId()).eval(manifest));
+
ConcurrentLinkedQueue<Closeable> toClose = new ConcurrentLinkedQueue<>();
Iterable<Iterable<FileScanTask>> readers = Iterables.transform(
- snapshot.manifests(),
+ matchingManifests,
manifest -> {
- ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest));
+ ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
toClose.add(reader);
String schemaString = SchemaParser.toJson(reader.spec().schema());
String specString = PartitionSpecParser.toJson(reader.spec());
diff --git a/core/src/main/java/com/netflix/iceberg/FastAppend.java b/core/src/main/java/com/netflix/iceberg/FastAppend.java
index de34545..278f059 100644
--- a/core/src/main/java/com/netflix/iceberg/FastAppend.java
+++ b/core/src/main/java/com/netflix/iceberg/FastAppend.java
@@ -35,7 +35,7 @@ import java.util.Set;
class FastAppend extends SnapshotUpdate implements AppendFiles {
private final PartitionSpec spec;
private final List<DataFile> newFiles = Lists.newArrayList();
- private String newManifestLocation = null;
+ private ManifestFile newManifest = null;
private boolean hasNewFiles = false;
FastAppend(TableOperations ops) {
@@ -51,11 +51,15 @@ class FastAppend extends SnapshotUpdate implements AppendFiles {
}
@Override
- public List<String> apply(TableMetadata base) {
- String location = writeManifest();
+ public List<ManifestFile> apply(TableMetadata base) {
+ List<ManifestFile> newManifests = Lists.newArrayList();
+
+ try {
+ newManifests.add(writeManifest());
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to write manifest");
+ }
- List<String> newManifests = Lists.newArrayList();
- newManifests.add(location);
if (base.currentSnapshot() != null) {
newManifests.addAll(base.currentSnapshot().manifests());
}
@@ -64,33 +68,32 @@ class FastAppend extends SnapshotUpdate implements AppendFiles {
}
@Override
- protected void cleanUncommitted(Set<String> committed) {
- if (!committed.contains(newManifestLocation)) {
- deleteFile(newManifestLocation);
+ protected void cleanUncommitted(Set<ManifestFile> committed) {
+ if (!committed.contains(newManifest)) {
+ deleteFile(newManifest.path());
}
}
- private String writeManifest() {
- if (hasNewFiles && newManifestLocation != null) {
- deleteFile(newManifestLocation);
- hasNewFiles = false;
- newManifestLocation = null;
+ private ManifestFile writeManifest() throws IOException {
+ if (hasNewFiles && newManifest != null) {
+ deleteFile(newManifest.path());
+ newManifest = null;
}
- if (newManifestLocation == null) {
+ if (newManifest == null) {
OutputFile out = manifestPath(0);
- try (ManifestWriter writer = new ManifestWriter(spec, out, snapshotId())) {
-
+ ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
+ try {
writer.addAll(newFiles);
-
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to write manifest: %s", out);
+ } finally {
+ writer.close();
}
- this.newManifestLocation = out.location();
+ this.newManifest = writer.toManifestFile();
+ hasNewFiles = false;
}
- return newManifestLocation;
+ return newManifest;
}
}
diff --git a/core/src/main/java/com/netflix/iceberg/FileHistory.java b/core/src/main/java/com/netflix/iceberg/FileHistory.java
index 3ed8006..60146b0 100644
--- a/core/src/main/java/com/netflix/iceberg/FileHistory.java
+++ b/core/src/main/java/com/netflix/iceberg/FileHistory.java
@@ -32,6 +32,9 @@ import java.io.IOException;
import java.util.List;
import java.util.Set;
+import static com.google.common.collect.Iterables.concat;
+import static com.google.common.collect.Iterables.transform;
+
public class FileHistory {
private static final List<String> HISTORY_COLUMNS = ImmutableList.of("file_path");
@@ -91,12 +94,17 @@ public class FileHistory {
snapshots = Iterables.filter(snapshots, snap -> snap.timestampMillis() <= endTime);
}
+ // only use manifests that were added in the matching snapshots
+ Set<Long> matchingIds = Sets.newHashSet(transform(snapshots, snap -> snap.snapshotId()));
+ Iterable<ManifestFile> manifests = Iterables.filter(
+ concat(transform(snapshots, Snapshot::manifests)),
+ manifest -> manifest.snapshotId() == null || matchingIds.contains(manifest.snapshotId()));
+
// a manifest group will only read each manifest once
- ManifestGroup manifests = new ManifestGroup(((HasTableOperations) table).operations(),
- Iterables.concat(Iterables.transform(snapshots, Snapshot::manifests)));
+ ManifestGroup group = new ManifestGroup(((HasTableOperations) table).operations(), manifests);
List<ManifestEntry> results = Lists.newArrayList();
- try (CloseableIterable<ManifestEntry> entries = manifests.select(HISTORY_COLUMNS).entries()) {
+ try (CloseableIterable<ManifestEntry> entries = group.select(HISTORY_COLUMNS).entries()) {
// TODO: replace this with an IN predicate
CharSequenceWrapper locationWrapper = CharSequenceWrapper.wrap(null);
for (ManifestEntry entry : entries) {
diff --git a/core/src/main/java/com/netflix/iceberg/GenericManifestFile.java b/core/src/main/java/com/netflix/iceberg/GenericManifestFile.java
new file mode 100644
index 0000000..628515b
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/GenericManifestFile.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.base.Objects;
+import com.netflix.iceberg.avro.AvroSchemaUtil;
+import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.types.Types;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData.SchemaConstructable;
+import java.io.Serializable;
+import java.util.List;
+
+import static com.google.common.collect.ImmutableList.copyOf;
+import static com.google.common.collect.Iterables.transform;
+
+public class GenericManifestFile
+ implements ManifestFile, StructLike, IndexedRecord, SchemaConstructable, Serializable {
+ private static final Schema AVRO_SCHEMA = AvroSchemaUtil.convert(
+ ManifestFile.schema(), "manifest_file");
+
+ private transient Schema avroSchema; // not final for Java serialization
+ private int[] fromProjectionPos;
+
+ // data fields
+ private InputFile file = null;
+ private String manifestPath = null;
+ private Long length = null;
+ private int specId = -1;
+ private Long snapshotId = null;
+ private Integer addedFilesCount = null;
+ private Integer existingFilesCount = null;
+ private Integer deletedFilesCount = null;
+ private List<PartitionFieldSummary> partitions = null;
+
+ /**
+ * Used by Avro reflection to instantiate this class when reading manifest files.
+ */
+ public GenericManifestFile(org.apache.avro.Schema avroSchema) {
+ this.avroSchema = avroSchema;
+
+ List<Types.NestedField> fields = AvroSchemaUtil.convert(avroSchema)
+ .asNestedType()
+ .asStructType()
+ .fields();
+ List<Types.NestedField> allFields = ManifestFile.schema().asStruct().fields();
+
+ this.fromProjectionPos = new int[fields.size()];
+ for (int i = 0; i < fromProjectionPos.length; i += 1) {
+ boolean found = false;
+ for (int j = 0; j < allFields.size(); j += 1) {
+ if (fields.get(i).fieldId() == allFields.get(j).fieldId()) {
+ found = true;
+ fromProjectionPos[i] = j;
+ }
+ }
+
+ if (!found) {
+ throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i));
+ }
+ }
+ }
+
+ GenericManifestFile(InputFile file, int specId) {
+ this.avroSchema = AVRO_SCHEMA;
+ this.file = file;
+ this.manifestPath = file.location();
+ this.length = null; // lazily loaded from file
+ this.specId = specId;
+ this.snapshotId = null;
+ this.addedFilesCount = null;
+ this.existingFilesCount = null;
+ this.deletedFilesCount = null;
+ this.partitions = null;
+ this.fromProjectionPos = null;
+ }
+
+ public GenericManifestFile(String path, long length, int specId, long snapshotId,
+ int addedFilesCount, int existingFilesCount, int deletedFilesCount,
+ List<PartitionFieldSummary> partitions) {
+ this.avroSchema = AVRO_SCHEMA;
+ this.manifestPath = path;
+ this.length = length;
+ this.specId = specId;
+ this.snapshotId = snapshotId;
+ this.addedFilesCount = addedFilesCount;
+ this.existingFilesCount = existingFilesCount;
+ this.deletedFilesCount = deletedFilesCount;
+ this.partitions = partitions;
+ this.fromProjectionPos = null;
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param toCopy a generic manifest file to copy.
+ */
+ private GenericManifestFile(GenericManifestFile toCopy) {
+ this.avroSchema = toCopy.avroSchema;
+ this.manifestPath = toCopy.manifestPath;
+ this.length = toCopy.length;
+ this.specId = toCopy.specId;
+ this.snapshotId = toCopy.snapshotId;
+ this.addedFilesCount = toCopy.addedFilesCount;
+ this.existingFilesCount = toCopy.existingFilesCount;
+ this.deletedFilesCount = toCopy.deletedFilesCount;
+ this.partitions = copyOf(transform(toCopy.partitions, PartitionFieldSummary::copy));
+ this.fromProjectionPos = toCopy.fromProjectionPos;
+ }
+
+ /**
+ * Constructor for Java serialization.
+ */
+ GenericManifestFile() {
+ }
+
+ @Override
+ public String path() {
+ return manifestPath;
+ }
+
+ public Long lazyLength() {
+ if (length == null) {
+ if (file != null) {
+ // this was created from an input file and length is lazily loaded
+ this.length = file.getLength();
+ } else {
+ // this was loaded from a file without projecting length, throw an exception
+ return null;
+ }
+ }
+ return length;
+ }
+
+ @Override
+ public long length() {
+ return lazyLength();
+ }
+
+ @Override
+ public int partitionSpecId() {
+ return specId;
+ }
+
+ @Override
+ public Long snapshotId() {
+ return snapshotId;
+ }
+
+ @Override
+ public Integer addedFilesCount() {
+ return addedFilesCount;
+ }
+
+ @Override
+ public Integer existingFilesCount() {
+ return existingFilesCount;
+ }
+
+ @Override
+ public Integer deletedFilesCount() {
+ return deletedFilesCount;
+ }
+
+ @Override
+ public List<PartitionFieldSummary> partitions() {
+ return partitions;
+ }
+
+ @Override
+ public int size() {
+ return ManifestFile.schema().columns().size();
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ return javaClass.cast(get(pos));
+ }
+
+ @Override
+ public void put(int i, Object v) {
+ set(i, v);
+ }
+
+ @Override
+ public Object get(int i) {
+ int pos = i;
+ // if the schema was projected, map the incoming ordinal to the expected one
+ if (fromProjectionPos != null) {
+ pos = fromProjectionPos[i];
+ }
+ switch (pos) {
+ case 0:
+ return manifestPath;
+ case 1:
+ return lazyLength();
+ case 2:
+ return specId;
+ case 3:
+ return snapshotId;
+ case 4:
+ return addedFilesCount;
+ case 5:
+ return existingFilesCount;
+ case 6:
+ return deletedFilesCount;
+ case 7:
+ return partitions;
+ default:
+ throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> void set(int i, T value) {
+ int pos = i;
+ // if the schema was projected, map the incoming ordinal to the expected one
+ if (fromProjectionPos != null) {
+ pos = fromProjectionPos[i];
+ }
+ switch (pos) {
+ case 0:
+ // always coerce to String for Serializable
+ this.manifestPath = value.toString();
+ return;
+ case 1:
+ this.length = (Long) value;
+ return;
+ case 2:
+ this.specId = (Integer) value;
+ return;
+ case 3:
+ this.snapshotId = (Long) value;
+ return;
+ case 4:
+ this.addedFilesCount = (Integer) value;
+ return;
+ case 5:
+ this.existingFilesCount = (Integer) value;
+ return;
+ case 6:
+ this.deletedFilesCount = (Integer) value;
+ return;
+ case 7:
+ this.partitions = (List<PartitionFieldSummary>) value;
+ return;
+ default:
+ // ignore the object, it must be from a newer version of the format
+ }
+ }
+
+ @Override
+ public ManifestFile copy() {
+ return new GenericManifestFile(this);
+ }
+
+ @Override
+ public Schema getSchema() {
+ return avroSchema;
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (other == null || getClass() != other.getClass()) {
+ return false;
+ }
+ GenericManifestFile that = (GenericManifestFile) other;
+ return Objects.equal(manifestPath, that.manifestPath);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(manifestPath);
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("path", manifestPath)
+ .add("length", length)
+ .add("partition_spec_id", specId)
+ .add("added_snapshot_id", snapshotId)
+ .add("added_data_files_count", addedFilesCount)
+ .add("existing_data_files_count", existingFilesCount)
+ .add("deleted_data_files_count", deletedFilesCount)
+ .add("partitions", partitions)
+ .toString();
+ }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/GenericPartitionFieldSummary.java b/core/src/main/java/com/netflix/iceberg/GenericPartitionFieldSummary.java
new file mode 100644
index 0000000..0c57cb3
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/GenericPartitionFieldSummary.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.base.Objects;
+import com.netflix.iceberg.ManifestFile.PartitionFieldSummary;
+import com.netflix.iceberg.avro.AvroSchemaUtil;
+import com.netflix.iceberg.types.Types;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData.SchemaConstructable;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class GenericPartitionFieldSummary
+ implements PartitionFieldSummary, StructLike, IndexedRecord, SchemaConstructable, Serializable {
+ private static final Schema AVRO_SCHEMA = AvroSchemaUtil.convert(PartitionFieldSummary.getType());
+
+ private transient Schema avroSchema; // not final for Java serialization
+ private int[] fromProjectionPos;
+
+ // data fields
+ private boolean containsNull = false;
+ private ByteBuffer lowerBound = null;
+ private ByteBuffer upperBound = null;
+
+ /**
+ * Used by Avro reflection to instantiate this class when reading manifest files.
+ */
+ public GenericPartitionFieldSummary(Schema avroSchema) {
+ this.avroSchema = avroSchema;
+
+ List<Types.NestedField> fields = AvroSchemaUtil.convert(avroSchema)
+ .asNestedType()
+ .asStructType()
+ .fields();
+ List<Types.NestedField> allFields = PartitionFieldSummary.getType().fields();
+
+ this.fromProjectionPos = new int[fields.size()];
+ for (int i = 0; i < fromProjectionPos.length; i += 1) {
+ boolean found = false;
+ for (int j = 0; j < allFields.size(); j += 1) {
+ if (fields.get(i).fieldId() == allFields.get(j).fieldId()) {
+ found = true;
+ fromProjectionPos[i] = j;
+ }
+ }
+
+ if (!found) {
+ throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i));
+ }
+ }
+ }
+
+ public GenericPartitionFieldSummary(boolean containsNull, ByteBuffer lowerBound,
+ ByteBuffer upperBound) {
+ this.avroSchema = AVRO_SCHEMA;
+ this.containsNull = containsNull;
+ this.lowerBound = lowerBound;
+ this.upperBound = upperBound;
+ this.fromProjectionPos = null;
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param toCopy a generic manifest file to copy.
+ */
+ private GenericPartitionFieldSummary(GenericPartitionFieldSummary toCopy) {
+ this.avroSchema = toCopy.avroSchema;
+ this.containsNull = toCopy.containsNull;
+ this.lowerBound = toCopy.lowerBound;
+ this.upperBound = toCopy.upperBound;
+ this.fromProjectionPos = toCopy.fromProjectionPos;
+ }
+
+ /**
+ * Constructor for Java serialization.
+ */
+ GenericPartitionFieldSummary() {
+ }
+
+ @Override
+ public boolean containsNull() {
+ return containsNull;
+ }
+
+ @Override
+ public ByteBuffer lowerBound() {
+ return lowerBound;
+ }
+
+ @Override
+ public ByteBuffer upperBound() {
+ return upperBound;
+ }
+
+ @Override
+ public int size() {
+ return PartitionFieldSummary.getType().fields().size();
+ }
+
+ @Override
+ public <T> T get(int pos, Class<T> javaClass) {
+ return javaClass.cast(get(pos));
+ }
+
+ @Override
+ public void put(int i, Object v) {
+ set(i, v);
+ }
+
+ @Override
+ public Object get(int i) {
+ int pos = i;
+ // if the schema was projected, map the incoming ordinal to the expected one
+ if (fromProjectionPos != null) {
+ pos = fromProjectionPos[i];
+ }
+ switch (pos) {
+ case 0:
+ return containsNull;
+ case 1:
+ return lowerBound;
+ case 2:
+ return upperBound;
+ default:
+ throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
+ }
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> void set(int i, T value) {
+ int pos = i;
+ // if the schema was projected, map the incoming ordinal to the expected one
+ if (fromProjectionPos != null) {
+ pos = fromProjectionPos[i];
+ }
+ switch (pos) {
+ case 0:
+ this.containsNull = (Boolean) value;
+ return;
+ case 1:
+ this.lowerBound = (ByteBuffer) value;
+ return;
+ case 2:
+ this.upperBound = (ByteBuffer) value;
+ return;
+ default:
+ // ignore the object, it must be from a newer version of the format
+ }
+ }
+
+ @Override
+ public PartitionFieldSummary copy() {
+ return new GenericPartitionFieldSummary(this);
+ }
+
+ @Override
+ public Schema getSchema() {
+ return avroSchema;
+ }
+
+ @Override
+ public String toString() {
+ return Objects.toStringHelper(this)
+ .add("contains_null", containsNull)
+ .add("lower_bound", lowerBound)
+ .add("upper_bound", upperBound)
+ .toString();
+ }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
index f343427..19d993f 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestGroup.java
@@ -37,18 +37,18 @@ class ManifestGroup {
private static final Types.StructType EMPTY_STRUCT = Types.StructType.of();
private final TableOperations ops;
- private final Set<String> manifests;
+ private final Set<ManifestFile> manifests;
private final Expression dataFilter;
private final Expression fileFilter;
private final boolean ignoreDeleted;
private final List<String> columns;
- ManifestGroup(TableOperations ops, Iterable<String> manifests) {
+ ManifestGroup(TableOperations ops, Iterable<ManifestFile> manifests) {
this(ops, Sets.newHashSet(manifests), Expressions.alwaysTrue(), Expressions.alwaysTrue(),
false, ImmutableList.of("*"));
}
- private ManifestGroup(TableOperations ops, Set<String> manifests,
+ private ManifestGroup(TableOperations ops, Set<ManifestFile> manifests,
Expression dataFilter, Expression fileFilter, boolean ignoreDeleted,
List<String> columns) {
this.ops = ops;
@@ -94,10 +94,20 @@ class ManifestGroup {
Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter);
List<Closeable> toClose = Lists.newArrayList();
+ Iterable<ManifestFile> matchingManifests = manifests;
+
+ if (ignoreDeleted) {
+ // remove any manifests that don't have any existing or added files. if either the added or
+ // existing files count is missing, the manifest must be scanned.
+ matchingManifests = Iterables.filter(manifests, manifest ->
+ manifest.addedFilesCount() == null || manifest.existingFilesCount() == null ||
+ manifest.addedFilesCount() + manifest.existingFilesCount() > 0);
+ }
+
Iterable<Iterable<ManifestEntry>> readers = Iterables.transform(
- manifests,
+ matchingManifests,
manifest -> {
- ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest));
+ ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()));
FilteredManifest filtered = reader.filterRows(dataFilter).select(columns);
toClose.add(reader);
return Iterables.filter(
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
new file mode 100644
index 0000000..98cdbbf
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/ManifestListWriter.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.ImmutableMap;
+import com.netflix.iceberg.avro.Avro;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.FileAppender;
+import com.netflix.iceberg.io.OutputFile;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+class ManifestListWriter implements FileAppender<ManifestFile> {
+ private final FileAppender<ManifestFile> writer;
+
+ ManifestListWriter(OutputFile snapshotFile, long snapshotId, Long parentSnapshotId) {
+ this.writer = newAppender(snapshotFile, ImmutableMap.of(
+ "snapshot-id", String.valueOf(snapshotId),
+ "parent-snapshot-id", String.valueOf(parentSnapshotId)));
+ }
+
+ @Override
+ public void add(ManifestFile file) {
+ writer.add(file);
+ }
+
+ @Override
+ public void addAll(Iterator<ManifestFile> values) {
+ writer.addAll(values);
+ }
+
+ @Override
+ public void addAll(Iterable<ManifestFile> values) {
+ writer.addAll(values);
+ }
+
+ @Override
+ public Metrics metrics() {
+ return writer.metrics();
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+
+ private static FileAppender<ManifestFile> newAppender(OutputFile file, Map<String, String> meta) {
+ try {
+ return Avro.write(file)
+ .schema(ManifestFile.schema())
+ .named("manifest_file")
+ .meta(meta)
+ .build();
+
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to create snapshot list writer for path: " + file);
+ }
+ }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
index 28ba831..a59c100 100644
--- a/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
+++ b/core/src/main/java/com/netflix/iceberg/ManifestWriter.java
@@ -19,6 +19,7 @@
package com.netflix.iceberg;
+import com.google.common.base.Preconditions;
import com.netflix.iceberg.avro.Avro;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.FileAppender;
@@ -35,14 +36,27 @@ import static com.netflix.iceberg.ManifestEntry.Status.DELETED;
class ManifestWriter implements FileAppender<DataFile> {
private static final Logger LOG = LoggerFactory.getLogger(ManifestWriter.class);
+ private final String location;
+ private final OutputFile file;
+ private final int specId;
private final FileAppender<ManifestEntry> writer;
private final long snapshotId;
- private ManifestEntry reused = null;
+ private final ManifestEntry reused;
+ private final PartitionSummary stats;
+
+ private boolean closed = false;
+ private int addedFiles = 0;
+ private int existingFiles = 0;
+ private int deletedFiles = 0;
ManifestWriter(PartitionSpec spec, OutputFile file, long snapshotId) {
+ this.location = file.location();
+ this.file = file;
+ this.specId = spec.specId();
this.writer = newAppender(FileFormat.AVRO, spec, file);
this.snapshotId = snapshotId;
this.reused = new ManifestEntry(spec.partitionType());
+ this.stats = new PartitionSummary(spec);
}
public void addExisting(Iterable<ManifestEntry> entries) {
@@ -54,25 +68,37 @@ class ManifestWriter implements FileAppender<DataFile> {
}
public void addExisting(ManifestEntry entry) {
- writer.add(reused.wrapExisting(entry.snapshotId(), entry.file()));
+ add(reused.wrapExisting(entry.snapshotId(), entry.file()));
}
public void addExisting(long snapshotId, DataFile file) {
- writer.add(reused.wrapExisting(snapshotId, file));
+ add(reused.wrapExisting(snapshotId, file));
}
public void delete(ManifestEntry entry) {
// Use the current Snapshot ID for the delete. It is safe to delete the data file from disk
// when this Snapshot has been removed or when there are no Snapshots older than this one.
- writer.add(reused.wrapDelete(snapshotId, entry.file()));
+ add(reused.wrapDelete(snapshotId, entry.file()));
}
public void delete(DataFile file) {
- writer.add(reused.wrapDelete(snapshotId, file));
+ add(reused.wrapDelete(snapshotId, file));
}
- public void add(ManifestEntry file) {
- writer.add(file);
+ public void add(ManifestEntry entry) {
+ switch (entry.status()) {
+ case ADDED:
+ addedFiles += 1;
+ break;
+ case EXISTING:
+ existingFiles += 1;
+ break;
+ case DELETED:
+ deletedFiles += 1;
+ break;
+ }
+ stats.update(entry.file().partition());
+ writer.add(entry);
}
public void addEntries(Iterable<ManifestEntry> entries) {
@@ -85,7 +111,7 @@ class ManifestWriter implements FileAppender<DataFile> {
public void add(DataFile file) {
// TODO: this assumes that file is a GenericDataFile that can be written directly to Avro
// Eventually, this should check in case there are other DataFile implementations.
- writer.add(reused.wrapAppend(snapshotId, file));
+ add(reused.wrapAppend(snapshotId, file));
}
@Override
@@ -93,8 +119,15 @@ class ManifestWriter implements FileAppender<DataFile> {
return writer.metrics();
}
+ public ManifestFile toManifestFile() {
+ Preconditions.checkState(closed, "Cannot build ManifestFile, writer is not closed");
+ return new GenericManifestFile(location, file.toInputFile().getLength(), specId, snapshotId,
+ addedFiles, existingFiles, deletedFiles, stats.summaries());
+ }
+
@Override
public void close() throws IOException {
+ this.closed = true;
writer.close();
}
diff --git a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
index f12ba04..8878d4c 100644
--- a/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/MergingSnapshotUpdate.java
@@ -25,7 +25,6 @@ import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import com.google.common.io.Closeables;
import com.netflix.iceberg.ManifestEntry.Status;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.exceptions.ValidationException;
@@ -34,7 +33,6 @@ import com.netflix.iceberg.expressions.Expression;
import com.netflix.iceberg.expressions.Expressions;
import com.netflix.iceberg.expressions.Projections;
import com.netflix.iceberg.expressions.StrictMetricsEvaluator;
-import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
import com.netflix.iceberg.util.BinPacking.ListPacker;
import com.netflix.iceberg.util.CharSequenceWrapper;
@@ -42,15 +40,12 @@ import com.netflix.iceberg.util.StructLikeWrapper;
import com.netflix.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.Array;
-import java.util.Collections;
-import java.util.Iterator;
+import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.collect.Iterables.filter;
@@ -64,7 +59,6 @@ import static com.netflix.iceberg.util.ThreadPools.getWorkerPool;
abstract class MergingSnapshotUpdate extends SnapshotUpdate {
private final Logger LOG = LoggerFactory.getLogger(getClass());
- private static final long SIZE_PER_FILE = 100; // assume each file will be ~100 bytes
private static final Joiner COMMA = Joiner.on(",");
protected static class DeleteException extends ValidationException {
@@ -94,14 +88,18 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
private boolean failAnyDelete = false;
private boolean failMissingDeletePaths = false;
+ // cache the new manifest once it is written
+ private ManifestFile newManifest = null;
+ private boolean hasNewFiles = false;
+
// cache merge results to reuse when retrying
- private final Map<List<String>, String> mergeManifests = Maps.newConcurrentMap();
+ private final Map<List<ManifestFile>, ManifestFile> mergeManifests = Maps.newConcurrentMap();
// cache filtered manifests to avoid extra work when commits fail.
- private final Map<String, ManifestReader> filteredManifests = Maps.newConcurrentMap();
+ private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
// tracking where files were deleted to validate retries quickly
- private final Map<String, Set<CharSequenceWrapper>> filteredManifestToDeletedFiles =
+ private final Map<ManifestFile, Set<CharSequenceWrapper>> filteredManifestToDeletedFiles =
Maps.newConcurrentMap();
private boolean filterUpdated = false; // used to clear caches of filtered and merged manifests
@@ -169,77 +167,70 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
* Add a file to the new snapshot.
*/
protected void add(DataFile file) {
+ hasNewFiles = true;
newFiles.add(file);
}
@Override
- public List<String> apply(TableMetadata base) {
+ public List<ManifestFile> apply(TableMetadata base) {
if (filterUpdated) {
- cleanUncommittedFilters(EMPTY_SET);
+ cleanUncommittedFilters(SnapshotUpdate.EMPTY_SET);
this.filterUpdated = false;
}
Snapshot current = base.currentSnapshot();
- List<PartitionSpec> specs = Lists.newArrayList();
- List<List<ManifestReader>> groups = Lists.newArrayList();
+ Map<Integer, List<ManifestFile>> groups = Maps.newTreeMap(Comparator.<Integer>reverseOrder());
// use a common metrics evaluator for all manifests because it is bound to the table schema
StrictMetricsEvaluator metricsEvaluator = new StrictMetricsEvaluator(
ops.current().schema(), deleteExpression);
// add the current spec as the first group. files are added to the beginning.
- if (newFiles.size() > 0) {
- specs.add(spec);
- groups.add(Lists.newArrayList());
- groups.get(0).add(newFilesAsManifest());
- }
-
- ConcurrentLinkedQueue<ManifestReader> toClose = new ConcurrentLinkedQueue<>();
- boolean threw = true;
try {
+ if (newFiles.size() > 0) {
+ ManifestFile newManifest = newFilesAsManifest();
+ List<ManifestFile> manifestGroup = Lists.newArrayList();
+ manifestGroup.add(newManifest);
+ groups.put(newManifest.partitionSpecId(), manifestGroup);
+ }
+
Set<CharSequenceWrapper> deletedFiles = Sets.newHashSet();
// group manifests by compatible partition specs to be merged
if (current != null) {
- List<String> manifests = current.manifests();
- ManifestReader[] readers = new ManifestReader[manifests.size()];
+ List<ManifestFile> manifests = current.manifests();
+ ManifestFile[] filtered = new ManifestFile[manifests.size()];
// open all of the manifest files in parallel, use index to avoid reordering
- Tasks.range(readers.length)
+ Tasks.range(filtered.length)
.stopOnFailure().throwFailureWhenFinished()
.executeWith(getWorkerPool())
.run(index -> {
- ManifestReader manifest = filterManifest(
- deleteExpression, metricsEvaluator, ops.newInputFile(manifests.get(index)));
- readers[index] = manifest;
- toClose.add(manifest);
- });
-
- for (ManifestReader reader : readers) {
- if (reader.file() != null) {
- String location = reader.file().location();
- Set<CharSequenceWrapper> manifestDeletes = filteredManifestToDeletedFiles.get(location);
- if (manifestDeletes != null) {
- deletedFiles.addAll(manifestDeletes);
- }
+ ManifestFile manifest = filterManifest(
+ deleteExpression, metricsEvaluator,
+ manifests.get(index));
+ filtered[index] = manifest;
+ }, IOException.class);
+
+ for (ManifestFile manifest : filtered) {
+ Set<CharSequenceWrapper> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
+ if (manifestDeletes != null) {
+ deletedFiles.addAll(manifestDeletes);
}
- int index = findMatch(specs, reader.spec());
- if (index < 0) {
- // not found, add a new one
- List<ManifestReader> newList = Lists.<ManifestReader>newArrayList(reader);
- specs.add(reader.spec());
- groups.add(newList);
+ List<ManifestFile> group = groups.get(manifest.partitionSpecId());
+ if (group != null) {
+ group.add(manifest);
} else {
- // replace the reader spec with the later one
- specs.set(index, reader.spec());
- groups.get(index).add(reader);
+ group = Lists.newArrayList();
+ group.add(manifest);
+ groups.put(manifest.partitionSpecId(), group);
}
}
}
- List<String> manifests = Lists.newArrayList();
- for (int i = 0; i < specs.size(); i += 1) {
- for (String manifest : mergeGroup(specs.get(i), groups.get(i))) {
+ List<ManifestFile> manifests = Lists.newArrayList();
+ for (Map.Entry<Integer, List<ManifestFile>> entry : groups.entrySet()) {
+ for (ManifestFile manifest : mergeGroup(entry.getKey(), entry.getValue())) {
manifests.add(manifest);
}
}
@@ -250,52 +241,56 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
path -> !deletedFiles.contains(path)),
CharSequenceWrapper::get)));
- threw = false;
-
return manifests;
- } finally {
- for (ManifestReader reader : toClose) {
- try {
- Closeables.close(reader, threw);
- } catch (IOException e) {
- throw new RuntimeIOException(e);
- }
- }
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to create snapshot manifest list");
}
}
- private void cleanUncommittedMerges(Set<String> committed) {
- List<Map.Entry<List<String>, String>> entries = Lists.newArrayList(mergeManifests.entrySet());
- for (Map.Entry<List<String>, String> entry : entries) {
+ private void cleanUncommittedMerges(Set<ManifestFile> committed) {
+ // iterate over a copy of entries to avoid concurrent modification
+ List<Map.Entry<List<ManifestFile>, ManifestFile>> entries =
+ Lists.newArrayList(mergeManifests.entrySet());
+
+ for (Map.Entry<List<ManifestFile>, ManifestFile> entry : entries) {
// delete any new merged manifests that aren't in the committed list
- String merged = entry.getValue();
+ ManifestFile merged = entry.getValue();
if (!committed.contains(merged)) {
- deleteFile(merged);
+ deleteFile(merged.path());
// remove the deleted file from the cache
mergeManifests.remove(entry.getKey());
}
}
}
- private void cleanUncommittedFilters(Set<String> committed) {
- List<Map.Entry<String, ManifestReader>> filterEntries = Lists.newArrayList(filteredManifests.entrySet());
- for (Map.Entry<String, ManifestReader> entry : filterEntries) {
+ private void cleanUncommittedFilters(Set<ManifestFile> committed) {
+ // iterate over a copy of entries to avoid concurrent modification
+ List<Map.Entry<ManifestFile, ManifestFile>> filterEntries =
+ Lists.newArrayList(filteredManifests.entrySet());
+
+ for (Map.Entry<ManifestFile, ManifestFile> entry : filterEntries) {
// remove any new filtered manifests that aren't in the committed list
- String manifest = entry.getKey();
- ManifestReader filtered = entry.getValue();
- if (filtered != null) {
- String location = filtered.file().location();
- if (!manifest.equals(location) && !committed.contains(location)) {
- filteredManifests.remove(manifest);
- deleteFile(location);
+ ManifestFile manifest = entry.getKey();
+ ManifestFile filtered = entry.getValue();
+ if (!committed.contains(filtered)) {
+ // only delete if the filtered copy was created
+ if (!manifest.equals(filtered)) {
+ deleteFile(filtered.path());
}
+
+ // remove the entry from the cache
+ filteredManifests.remove(manifest);
}
}
}
@Override
- protected void cleanUncommitted(Set<String> committed) {
+ protected void cleanUncommitted(Set<ManifestFile> committed) {
+ if (!committed.contains(newManifest)) {
+ deleteFile(newManifest.path());
+ this.newManifest = null;
+ }
cleanUncommittedMerges(committed);
cleanUncommittedFilters(committed);
}
@@ -308,22 +303,20 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
/**
* @return a ManifestReader that is a filtered version of the input manifest.
*/
- private ManifestReader filterManifest(Expression deleteExpression,
+ private ManifestFile filterManifest(Expression deleteExpression,
StrictMetricsEvaluator metricsEvaluator,
- InputFile manifest) {
- ManifestReader cached = filteredManifests.get(manifest.location());
+ ManifestFile manifest) throws IOException {
+ ManifestFile cached = filteredManifests.get(manifest);
if (cached != null) {
return cached;
}
- ManifestReader reader = ManifestReader.read(manifest);
-
if (nothingToFilter()) {
- filteredManifests.put(manifest.location(), reader);
- return reader;
+ filteredManifests.put(manifest, manifest);
+ return manifest;
}
- try {
+ try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
Expression inclusiveExpr = Projections
.inclusive(reader.spec())
.project(deleteExpression);
@@ -344,42 +337,35 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
boolean hasDeletedFiles = false;
- Iterator<ManifestEntry> entries = reader.entries().iterator();
- try {
- while (entries.hasNext()) {
- ManifestEntry entry = entries.next();
- DataFile file = entry.file();
- boolean fileDelete = (deletePaths.contains(pathWrapper.set(file.path())) ||
- dropPartitions.contains(partitionWrapper.set(file.partition())));
- if (fileDelete || inclusive.eval(file.partition())) {
- ValidationException.check(
- fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
- "Cannot delete file where some, but not all, rows match filter %s: %s",
- this.deleteExpression, file.path());
-
- hasDeletedFiles = true;
- if (failAnyDelete) {
- throw new DeleteException(writeSpec().partitionToPath(file.partition()));
- }
- break; // as soon as a deleted file is detected, stop scanning
+ for (ManifestEntry entry : reader.entries()) {
+ DataFile file = entry.file();
+ boolean fileDelete = (deletePaths.contains(pathWrapper.set(file.path())) ||
+ dropPartitions.contains(partitionWrapper.set(file.partition())));
+ if (fileDelete || inclusive.eval(file.partition())) {
+ ValidationException.check(
+ fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
+ "Cannot delete file where some, but not all, rows match filter %s: %s",
+ this.deleteExpression, file.path());
+
+ hasDeletedFiles = true;
+ if (failAnyDelete) {
+ throw new DeleteException(writeSpec().partitionToPath(file.partition()));
}
- }
- } finally {
- // the loop may have exited early. ensure the iterator is closed.
- if (entries instanceof Closeable) {
- ((Closeable) entries).close();
+ break; // as soon as a deleted file is detected, stop scanning
}
}
if (!hasDeletedFiles) {
- return reader;
+ filteredManifests.put(manifest, manifest);
+ return manifest;
}
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
OutputFile filteredCopy = manifestPath(manifestCount.getAndIncrement());
- try (ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId())) {
+ ManifestWriter writer = new ManifestWriter(reader.spec(), filteredCopy, snapshotId());
+ try {
for (ManifestEntry entry : reader.entries()) {
DataFile file = entry.file();
boolean fileDelete = (deletePaths.contains(pathWrapper.set(file.path())) ||
@@ -396,7 +382,7 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
if (deletedPaths.contains(wrapper)) {
LOG.warn("Deleting a duplicate path from manifest {}: {}",
- manifest.location(), wrapper.get());
+ manifest.path(), wrapper.get());
}
deletedPaths.add(wrapper);
@@ -405,96 +391,79 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
}
}
}
+ } finally {
+ writer.close();
}
- // close the reader now that it is no longer used and will not be returned
- reader.close();
-
// return the filtered manifest as a reader
- ManifestReader filtered = ManifestReader.read(ops.newInputFile(filteredCopy.location()));
+ ManifestFile filtered = writer.toManifestFile();
// update caches
- filteredManifests.put(manifest.location(), filtered);
- filteredManifestToDeletedFiles.put(filteredCopy.location(), deletedPaths);
+ filteredManifests.put(manifest, filtered);
+ filteredManifestToDeletedFiles.put(filtered, deletedPaths);
return filtered;
-
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to filter manifest: %s", reader.file().location());
}
}
@SuppressWarnings("unchecked")
- private Iterable<String> mergeGroup(PartitionSpec groupSpec, List<ManifestReader> group) {
+ private Iterable<ManifestFile> mergeGroup(int specId, List<ManifestFile> group)
+ throws IOException {
// use a lookback of 1 to avoid reordering the manifests. using 1 also means this should pack
// from the end so that the manifest that gets under-filled is the first one, which will be
// merged the next time.
- long newFilesSize = newFiles.size() * SIZE_PER_FILE;
- ListPacker<ManifestReader> packer = new ListPacker<>(manifestTargetSizeBytes, 1);
- List<List<ManifestReader>> bins = packer.packEnd(group,
- reader -> reader.file() != null ? reader.file().getLength() : newFilesSize);
+ ListPacker<ManifestFile> packer = new ListPacker<>(manifestTargetSizeBytes, 1);
+ List<List<ManifestFile>> bins = packer.packEnd(group, manifest -> manifest.length());
// process bins in parallel, but put results in the order of the bins into an array to preserve
// the order of manifests and contents. preserving the order helps avoid random deletes when
// data files are eventually aged off.
- List<String>[] binResults = (List<String>[]) Array.newInstance(List.class, bins.size());
+ List<ManifestFile>[] binResults = (List<ManifestFile>[])
+ Array.newInstance(List.class, bins.size());
Tasks.range(bins.size())
.stopOnFailure().throwFailureWhenFinished()
.executeWith(getWorkerPool())
.run(index -> {
- List<ManifestReader> bin = bins.get(index);
- List<String> outputManifests = Lists.newArrayList();
+ List<ManifestFile> bin = bins.get(index);
+ List<ManifestFile> outputManifests = Lists.newArrayList();
binResults[index] = outputManifests;
- if (bin.size() == 1 && bin.get(0).file() != null) {
+ if (bin.size() == 1) {
// no need to rewrite
- outputManifests.add(bin.get(0).file().location());
+ outputManifests.add(bin.get(0));
return;
}
- boolean hasInMemoryManifest = false;
- for (ManifestReader reader : bin) {
- if (reader.file() == null) {
- hasInMemoryManifest = true;
- }
- }
-
- // if the bin has an in-memory manifest (the new data) then only merge it if the number of
+ // if the bin has a new manifest (the new data files) then only merge it if the number of
// manifests is above the minimum count. this is applied only to bins with an in-memory
// manifest so that large manifests don't prevent merging older groups.
- if (hasInMemoryManifest && bin.size() < minManifestsCountToMerge) {
- for (ManifestReader reader : bin) {
- if (reader.file() != null) {
- outputManifests.add(reader.file().location());
- } else {
- // write the in-memory manifest
- outputManifests.add(createManifest(groupSpec, Collections.singletonList(reader)));
- }
- }
+ if (bin.contains(newManifest) && bin.size() < minManifestsCountToMerge) {
+ // not enough to merge, add all manifest files to the output list
+ outputManifests.addAll(bin);
} else {
- outputManifests.add(createManifest(groupSpec, bin));
+ // merge the group
+ outputManifests.add(createManifest(specId, bin));
}
- });
+ }, IOException.class);
return Iterables.concat(binResults);
}
- // NOTE: This assumes that any files that are added are in an in-memory manifest.
- private String createManifest(PartitionSpec binSpec, List<ManifestReader> bin) {
- List<String> key = cacheKey(bin);
+ private ManifestFile createManifest(int specId, List<ManifestFile> bin) throws IOException {
// if this merge was already rewritten, use the existing file.
- // if the new files are in this merge, the key is based on the number of new files so files
- // added after the last merge will cause a cache miss.
- if (mergeManifests.containsKey(key)) {
- return mergeManifests.get(key);
+ // if the new files are in this merge, then the ManifestFile for the new files has changed and
+ // will be a cache miss.
+ if (mergeManifests.containsKey(bin)) {
+ return mergeManifests.get(bin);
}
OutputFile out = manifestPath(manifestCount.getAndIncrement());
- try (ManifestWriter writer = new ManifestWriter(binSpec, out, snapshotId())) {
+ ManifestWriter writer = new ManifestWriter(ops.current().spec(specId), out, snapshotId());
+ try {
- for (ManifestReader reader : bin) {
- if (reader.file() != null) {
+ for (ManifestFile manifest : bin) {
+ try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
for (ManifestEntry entry : reader.entries()) {
if (entry.status() == Status.DELETED) {
// suppress deletes from previous snapshots. only files deleted by this snapshot
@@ -502,72 +471,49 @@ abstract class MergingSnapshotUpdate extends SnapshotUpdate {
if (entry.snapshotId() == snapshotId()) {
writer.add(entry);
}
+ } else if (entry.status() == Status.ADDED && entry.snapshotId() == snapshotId()) {
+ // adds from this snapshot are still adds, otherwise they should be existing
+ writer.add(entry);
} else {
// add all files from the old manifest as existing files
writer.addExisting(entry);
}
}
- } else {
- // if the files are in an in-memory manifest, then they are new
- writer.addEntries(reader.entries());
}
}
- } catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to write manifest: %s", out);
+ } finally {
+ writer.close();
}
- // update the cache
- mergeManifests.put(key, out.location());
+ ManifestFile manifest = writer.toManifestFile();
- return out.location();
- }
+ // update the cache
+ mergeManifests.put(bin, manifest);
- private ManifestReader newFilesAsManifest() {
- long id = snapshotId();
- ManifestEntry reused = new ManifestEntry(spec.partitionType());
- return ManifestReader.inMemory(spec,
- transform(newFiles, file -> {
- reused.wrapAppend(id, file);
- return reused;
- }));
+ return manifest;
}
- private List<String> cacheKey(List<ManifestReader> group) {
- List<String> key = Lists.newArrayList();
-
- for (ManifestReader reader : group) {
- if (reader.file() != null) {
- key.add(reader.file().location());
- } else {
- // if the file is null, this is an in-memory reader
- // use the size to avoid collisions if retries have added files
- key.add("append-" + newFiles.size() + "-files");
- }
+ private ManifestFile newFilesAsManifest() throws IOException {
+ if (hasNewFiles && newManifest != null) {
+ deleteFile(newManifest.path());
+ newManifest = null;
}
- return key;
- }
+ if (newManifest == null) {
+ OutputFile out = manifestPath(manifestCount.getAndIncrement());
- /**
- * Helper method to group manifests by compatible partition spec.
- * <p>
- * When a match is found, this will replace the current spec for the group with the query spec.
- * This is to produce manifests with the latest compatible spec.
- *
- * @param specs a list of partition specs, corresponding to the groups of readers
- * @param spec spec to be matched to a group
- * @return group of readers files for this spec can be merged into
- */
- private static int findMatch(List<PartitionSpec> specs,
- PartitionSpec spec) {
- // loop from last to first because later specs are most likely to match
- for (int i = specs.size() - 1; i >= 0; i -= 1) {
- if (specs.get(i).compatibleWith(spec)) {
- return i;
+ ManifestWriter writer = new ManifestWriter(spec, out, snapshotId());
+ try {
+ writer.addAll(newFiles);
+ } finally {
+ writer.close();
}
+
+ this.newManifest = writer.toManifestFile();
+ this.hasNewFiles = false;
}
- return -1;
+ return newManifest;
}
}
diff --git a/core/src/main/java/com/netflix/iceberg/OverwriteData.java b/core/src/main/java/com/netflix/iceberg/OverwriteData.java
index 404b440..3ebb725 100644
--- a/core/src/main/java/com/netflix/iceberg/OverwriteData.java
+++ b/core/src/main/java/com/netflix/iceberg/OverwriteData.java
@@ -52,7 +52,7 @@ public class OverwriteData extends MergingSnapshotUpdate implements OverwriteFil
}
@Override
- public List<String> apply(TableMetadata base) {
+ public List<ManifestFile> apply(TableMetadata base) {
if (validateAddedFiles) {
PartitionSpec spec = writeSpec();
Expression rowFilter = rowFilter();
diff --git a/core/src/main/java/com/netflix/iceberg/PartitionSummary.java b/core/src/main/java/com/netflix/iceberg/PartitionSummary.java
new file mode 100644
index 0000000..52a2b4a
--- /dev/null
+++ b/core/src/main/java/com/netflix/iceberg/PartitionSummary.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.google.common.collect.Lists;
+import com.netflix.iceberg.ManifestFile.PartitionFieldSummary;
+import com.netflix.iceberg.types.Comparators;
+import com.netflix.iceberg.types.Conversions;
+import com.netflix.iceberg.types.Type;
+import com.netflix.iceberg.types.Types;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+class PartitionSummary {
+ private final PartitionFieldStats<?>[] fields;
+ private final Class<?>[] javaClasses;
+
+ PartitionSummary(PartitionSpec spec) {
+ this.javaClasses = spec.javaClasses();
+ this.fields = new PartitionFieldStats[javaClasses.length];
+ List<Types.NestedField> partitionFields = spec.partitionType().fields();
+ for (int i = 0; i < fields.length; i += 1) {
+ this.fields[i] = new PartitionFieldStats<>(partitionFields.get(i).type());
+ }
+ }
+
+ List<PartitionFieldSummary> summaries() {
+ return Lists.transform(Arrays.asList(fields), PartitionFieldStats::toSummary);
+ }
+
+ public void update(StructLike partitionKey) {
+ updateFields(partitionKey);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> void updateFields(StructLike key) {
+ for (int i = 0; i < javaClasses.length; i += 1) {
+ PartitionFieldStats<T> stats = (PartitionFieldStats<T>) fields[i];
+ Class<T> javaClass = (Class<T>) javaClasses[i];
+ stats.update(key.get(i, javaClass));
+ }
+ }
+
+ private static class PartitionFieldStats<T> {
+ private final Type type;
+ private final Comparator<T> comparator;
+
+ private boolean containsNull = false;
+ private T min = null;
+ private T max = null;
+
+ private PartitionFieldStats(Type type) {
+ this.type = type;
+ this.comparator = Comparators.forType(type.asPrimitiveType());
+ }
+
+ public PartitionFieldSummary toSummary() {
+ return new GenericPartitionFieldSummary(containsNull,
+ min != null ? Conversions.toByteBuffer(type, min) : null,
+ max != null ? Conversions.toByteBuffer(type, max) : null);
+ }
+
+ void update(T value) {
+ if (value == null) {
+ this.containsNull = true;
+ } else if (min == null) {
+ this.min = value;
+ this.max = value;
+ } else {
+ if (comparator.compare(value, min) < 0) {
+ this.min = value;
+ }
+ if (comparator.compare(max, value) < 0) {
+ this.max = value;
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
index b097674..8f473b3 100644
--- a/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/com/netflix/iceberg/RemoveSnapshots.java
@@ -130,22 +130,22 @@ class RemoveSnapshots implements ExpireSnapshots {
TableMetadata current = ops.refresh();
Set<Long> currentIds = Sets.newHashSet();
- Set<String> currentManifests = Sets.newHashSet();
+ Set<ManifestFile> currentManifests = Sets.newHashSet();
for (Snapshot snapshot : current.snapshots()) {
currentIds.add(snapshot.snapshotId());
currentManifests.addAll(snapshot.manifests());
}
- Set<String> allManifests = Sets.newHashSet(currentManifests);
+ Set<ManifestFile> allManifests = Sets.newHashSet(currentManifests);
Set<String> manifestsToDelete = Sets.newHashSet();
for (Snapshot snapshot : base.snapshots()) {
long snapshotId = snapshot.snapshotId();
if (!currentIds.contains(snapshotId)) {
// the snapshot was removed, find any manifests that are no longer needed
LOG.info("Removing snapshot: {}", snapshot);
- for (String manifest : snapshot.manifests()) {
+ for (ManifestFile manifest : snapshot.manifests()) {
if (!currentManifests.contains(manifest)) {
- manifestsToDelete.add(manifest);
+ manifestsToDelete.add(manifest.path());
allManifests.add(manifest);
}
}
@@ -161,7 +161,7 @@ class RemoveSnapshots implements ExpireSnapshots {
).run(manifest -> {
// even if the manifest is still used, it may contain files that can be deleted
// TODO: eliminate manifests with no deletes without scanning
- try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest))) {
+ try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
for (ManifestEntry entry : reader.entries()) {
// if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
if (entry.status() == ManifestEntry.Status.DELETED &&
@@ -171,7 +171,7 @@ class RemoveSnapshots implements ExpireSnapshots {
}
}
} catch (IOException e) {
- throw new RuntimeIOException(e, "Failed to read manifest: " + manifest);
+ throw new RuntimeIOException(e, "Failed to read manifest file: " + manifest.path());
}
});
diff --git a/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java b/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
index 9d0db6c..4fb6aa8 100644
--- a/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
+++ b/core/src/main/java/com/netflix/iceberg/ReplacePartitionsOperation.java
@@ -42,7 +42,7 @@ public class ReplacePartitionsOperation extends MergingSnapshotUpdate implements
}
@Override
- public List<String> apply(TableMetadata base) {
+ public List<ManifestFile> apply(TableMetadata base) {
if (writeSpec().fields().size() <= 0) {
// replace all data in an unpartitioned table
deleteByRowFilter(Expressions.alwaysTrue());
diff --git a/core/src/main/java/com/netflix/iceberg/ScanSummary.java b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
index b19ab38..7315786 100644
--- a/core/src/main/java/com/netflix/iceberg/ScanSummary.java
+++ b/core/src/main/java/com/netflix/iceberg/ScanSummary.java
@@ -22,6 +22,7 @@ package com.netflix.iceberg;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -90,11 +91,21 @@ public class ScanSummary {
timeFilters.add(filter);
}
+ public Builder after(String timestamp) {
+ Literal<Long> tsLiteral = Literal.of(timestamp).to(Types.TimestampType.withoutZone());
+ return after(tsLiteral.value() / 1000);
+ }
+
public Builder after(long timestampMillis) {
addTimestampFilter(Expressions.greaterThanOrEqual("timestamp_ms", timestampMillis));
return this;
}
+ public Builder before(String timestamp) {
+ Literal<Long> tsLiteral = Literal.of(timestamp).to(Types.TimestampType.withoutZone());
+ return before(tsLiteral.value() / 1000);
+ }
+
public Builder before(long timestampMillis) {
addTimestampFilter(Expressions.lessThanOrEqual("timestamp_ms", timestampMillis));
return this;
@@ -145,29 +156,45 @@ public class ScanSummary {
removeTimeFilters(filters, Expressions.rewriteNot(scan.filter()));
Expression rowFilter = joinFilters(filters);
- long minTimestamp = Long.MIN_VALUE;
- long maxTimestamp = Long.MAX_VALUE;
+ Iterable<ManifestFile> manifests = table.currentSnapshot().manifests();
+
boolean filterByTimestamp = !timeFilters.isEmpty();
+ Set<Long> snapshotsInTimeRange = Sets.newHashSet();
if (filterByTimestamp) {
Pair<Long, Long> range = timestampRange(timeFilters);
- minTimestamp = range.first();
- maxTimestamp = range.second();
+ long minTimestamp = range.first();
+ long maxTimestamp = range.second();
+
+ for (Map.Entry<Long, Long> entry : snapshotTimestamps.entrySet()) {
+ long snapshotId = entry.getKey();
+ long timestamp = entry.getValue();
+ if (timestamp >= minTimestamp && timestamp <= maxTimestamp) {
+ snapshotsInTimeRange.add(snapshotId);
+ }
+ }
+
+ // when filtering by dateCreated or lastUpdated timestamp, this matches the set of files
+ // that were added in the time range. files are added in new snapshots, so to get the new
+ // files, this only needs to scan new manifests in the set of snapshots that match the
+ // filter. ManifestFile.snapshotId() returns the snapshot when the manifest was added, so
+ // the only manifests that need to be scanned are those with snapshotId() in the timestamp
+ // range, or those that don't have a snapshot ID.
+ manifests = Iterables.filter(manifests, manifest ->
+ manifest.snapshotId() == null || snapshotsInTimeRange.contains(manifest.snapshotId()));
}
- try (CloseableIterable<ManifestEntry> entries =
- new ManifestGroup(ops, table.currentSnapshot().manifests())
- .filterData(rowFilter)
- .ignoreDeleted()
- .select(SCAN_SUMMARY_COLUMNS)
- .entries()) {
+ try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops, manifests)
+ .filterData(rowFilter)
+ .ignoreDeleted()
+ .select(SCAN_SUMMARY_COLUMNS)
+ .entries()) {
PartitionSpec spec = table.spec();
for (ManifestEntry entry : entries) {
Long timestamp = snapshotTimestamps.get(entry.snapshotId());
// if filtering, skip timestamps that are outside the range
- if (filterByTimestamp &&
- (timestamp == null || timestamp < minTimestamp || timestamp > maxTimestamp)) {
+ if (filterByTimestamp && !snapshotsInTimeRange.contains(entry.snapshotId())) {
continue;
}
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
index cf04bec..a5ce08c 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotParser.java
@@ -22,8 +22,12 @@ package com.netflix.iceberg;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.util.JsonUtil;
+import com.netflix.iceberg.util.Tasks;
+import com.netflix.iceberg.util.ThreadPools;
import java.io.IOException;
import java.io.StringWriter;
import java.util.List;
@@ -34,19 +38,30 @@ public class SnapshotParser {
private static final String PARENT_SNAPSHOT_ID = "parent-snapshot-id";
private static final String TIMESTAMP_MS = "timestamp-ms";
private static final String MANIFESTS = "manifests";
+ private static final String MANIFEST_LIST = "manifest-list";
- static void toJson(Snapshot snapshot, JsonGenerator generator) throws IOException {
+ static void toJson(Snapshot snapshot, JsonGenerator generator)
+ throws IOException {
generator.writeStartObject();
generator.writeNumberField(SNAPSHOT_ID, snapshot.snapshotId());
if (snapshot.parentId() != null) {
generator.writeNumberField(PARENT_SNAPSHOT_ID, snapshot.parentId());
}
generator.writeNumberField(TIMESTAMP_MS, snapshot.timestampMillis());
- generator.writeArrayFieldStart(MANIFESTS);
- for (String file : snapshot.manifests()) {
- generator.writeString(file);
+
+ String manifestList = snapshot.manifestListLocation();
+ if (manifestList != null) {
+ // write just the location. manifests should not be embedded in JSON along with a list
+ generator.writeStringField(MANIFEST_LIST, manifestList);
+ } else {
+ // embed the manifest list in the JSON
+ generator.writeArrayFieldStart(MANIFESTS);
+ for (ManifestFile file : snapshot.manifests()) {
+ generator.writeString(file.path());
+ }
+ generator.writeEndArray();
}
- generator.writeEndArray();
+
generator.writeEndObject();
}
@@ -73,9 +88,19 @@ public class SnapshotParser {
parentId = JsonUtil.getLong(PARENT_SNAPSHOT_ID, node);
}
long timestamp = JsonUtil.getLong(TIMESTAMP_MS, node);
- List<String> manifests = JsonUtil.getStringList(MANIFESTS, node);
- return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests);
+ if (node.has(MANIFEST_LIST)) {
+ // the manifest list is stored in a manifest list file
+ String manifestList = JsonUtil.getString(MANIFEST_LIST, node);
+ return new BaseSnapshot(ops, versionId, parentId, timestamp, ops.newInputFile(manifestList));
+
+ } else {
+ // fall back to an embedded manifest list. pass in the manifest's InputFile so length can be
+ // loaded lazily, if it is needed
+ List<ManifestFile> manifests = Lists.transform(JsonUtil.getStringList(MANIFESTS, node),
+ location -> new GenericManifestFile(ops.newInputFile(location), 0));
+ return new BaseSnapshot(ops, versionId, parentId, timestamp, manifests);
+ }
}
public static Snapshot fromJson(TableOperations ops, String json) {
diff --git a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
index 8fe0d81..54c0483 100644
--- a/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
+++ b/core/src/main/java/com/netflix/iceberg/SnapshotUpdate.java
@@ -19,13 +19,19 @@
package com.netflix.iceberg;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.collect.Sets;
import com.netflix.iceberg.exceptions.CommitFailedException;
+import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.io.OutputFile;
import com.netflix.iceberg.util.Exceptions;
import com.netflix.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.UUID;
@@ -39,10 +45,28 @@ import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES;
import static com.netflix.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS;
import static com.netflix.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
+import static com.netflix.iceberg.TableProperties.MANIFEST_LISTS_ENABLED;
+import static com.netflix.iceberg.TableProperties.MANIFEST_LISTS_ENABLED_DEFAULT;
+import static com.netflix.iceberg.util.ThreadPools.getWorkerPool;
abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
private static final Logger LOG = LoggerFactory.getLogger(SnapshotUpdate.class);
- static final Set<String> EMPTY_SET = Sets.newHashSet();
+ static final Set<ManifestFile> EMPTY_SET = Sets.newHashSet();
+
+ /**
+ * Cache used to enrich ManifestFile instances that are written to a ManifestListWriter.
+ */
+ private final LoadingCache<ManifestFile, ManifestFile> manifestsWithMetadata = CacheBuilder
+ .newBuilder()
+ .build(new CacheLoader<ManifestFile, ManifestFile>() {
+ @Override
+ public ManifestFile load(ManifestFile file) {
+ if (file.snapshotId() != null) {
+ return file;
+ }
+ return addMetadata(ops, file);
+ }
+ });
private final TableOperations ops;
private final String commitUUID = UUID.randomUUID().toString();
@@ -60,7 +84,7 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
* @param base the base table metadata to apply changes to
* @return a manifest list for the new snapshot.
*/
- protected abstract List<String> apply(TableMetadata base);
+ protected abstract List<ManifestFile> apply(TableMetadata base);
/**
* Clean up any uncommitted manifests that were created.
@@ -72,16 +96,48 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
*
* @param committed a set of manifest paths that were actually committed
*/
- protected abstract void cleanUncommitted(Set<String> committed);
+ protected abstract void cleanUncommitted(Set<ManifestFile> committed);
@Override
public Snapshot apply() {
this.base = ops.refresh();
- List<String> manifests = apply(base);
- Long currentSnapshotId = base.currentSnapshot() != null ?
+ Long parentSnapshotId = base.currentSnapshot() != null ?
base.currentSnapshot().snapshotId() : null;
- return new BaseSnapshot(ops,
- snapshotId(), currentSnapshotId, System.currentTimeMillis(), manifests);
+
+ List<ManifestFile> manifests = apply(base);
+
+ if (base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
+ OutputFile manifestList = manifestListPath();
+
+ try (ManifestListWriter writer = new ManifestListWriter(
+ manifestListPath(), snapshotId(), parentSnapshotId)) {
+ ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
+
+ Tasks.range(manifestFiles.length)
+ .stopOnFailure().throwFailureWhenFinished()
+ .retry(4).exponentialBackoff(
+ base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
+ base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
+ 2.0 /* exponential */ )
+ .executeWith(getWorkerPool())
+ .run(index ->
+ manifestFiles[index] = manifestsWithMetadata.getUnchecked(manifests.get(index)));
+
+ writer.addAll(Arrays.asList(manifestFiles));
+
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to write manifest list file");
+ }
+
+ return new BaseSnapshot(ops,
+ snapshotId(), parentSnapshotId, System.currentTimeMillis(),
+ ops.newInputFile(manifestList.location()));
+
+ } else {
+ return new BaseSnapshot(ops,
+ snapshotId(), parentSnapshotId, System.currentTimeMillis(), manifests);
+ }
}
@Override
@@ -123,7 +179,7 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
}
} catch (RuntimeException e) {
- LOG.info("Failed to load committed table metadata, skipping manifest clean-up");
+ LOG.info("Failed to load committed table metadata, skipping manifest clean-up", e);
}
}
@@ -135,6 +191,11 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
ops.deleteFile(path);
}
+ protected OutputFile manifestListPath() {
+ return ops.newMetadataFile(FileFormat.AVRO.addExtension(
+ String.format("snap-%d-%s", snapshotId(), commitUUID)));
+ }
+
protected OutputFile manifestPath(int i) {
return ops.newMetadataFile(FileFormat.AVRO.addExtension(commitUUID + "-m" + i));
}
@@ -145,4 +206,52 @@ abstract class SnapshotUpdate implements PendingUpdate<Snapshot> {
}
return snapshotId;
}
+
+ private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) {
+ try (ManifestReader reader = ManifestReader.read(ops.newInputFile(manifest.path()))) {
+ PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId()));
+ int addedFiles = 0;
+ int existingFiles = 0;
+ int deletedFiles = 0;
+
+ Long snapshotId = null;
+ long maxSnapshotId = Long.MIN_VALUE;
+ for (ManifestEntry entry : reader.entries()) {
+ if (entry.snapshotId() > maxSnapshotId) {
+ maxSnapshotId = entry.snapshotId();
+ }
+
+ switch (entry.status()) {
+ case ADDED:
+ addedFiles += 1;
+ if (snapshotId == null) {
+ snapshotId = entry.snapshotId();
+ }
+ break;
+ case EXISTING:
+ existingFiles += 1;
+ break;
+ case DELETED:
+ deletedFiles += 1;
+ if (snapshotId == null) {
+ snapshotId = entry.snapshotId();
+ }
+ break;
+ }
+
+ stats.update(entry.file().partition());
+ }
+
+ if (snapshotId == null) {
+ // if no files were added or deleted, use the largest snapshot ID in the manifest
+ snapshotId = maxSnapshotId;
+ }
+
+ return new GenericManifestFile(manifest.path(), manifest.length(), manifest.partitionSpecId(),
+ snapshotId, addedFiles, existingFiles, deletedFiles, stats.summaries());
+
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path());
+ }
+ }
}
diff --git a/core/src/main/java/com/netflix/iceberg/TableMetadata.java b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
index 05c3392..c949f13 100644
--- a/core/src/main/java/com/netflix/iceberg/TableMetadata.java
+++ b/core/src/main/java/com/netflix/iceberg/TableMetadata.java
@@ -221,6 +221,14 @@ public class TableMetadata {
return properties;
}
+ public boolean propertyAsBoolean(String property, boolean defaultValue) {
+ String value = properties.get(property);
+ if (value != null) {
+ return Boolean.parseBoolean(properties.get(property));
+ }
+ return defaultValue;
+ }
+
public int propertyAsInt(String property, int defaultValue) {
String value = properties.get(property);
if (value != null) {
diff --git a/core/src/main/java/com/netflix/iceberg/TableProperties.java b/core/src/main/java/com/netflix/iceberg/TableProperties.java
index 6ca09a5..e522f84 100644
--- a/core/src/main/java/com/netflix/iceberg/TableProperties.java
+++ b/core/src/main/java/com/netflix/iceberg/TableProperties.java
@@ -66,4 +66,7 @@ public class TableProperties {
public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;
public static final String OBJECT_STORE_PATH = "write.object-storage.path";
+
+ public static final String MANIFEST_LISTS_ENABLED = "write.manifest-lists.enabled";
+ public static final boolean MANIFEST_LISTS_ENABLED_DEFAULT = false;
}
diff --git a/core/src/main/java/com/netflix/iceberg/avro/Avro.java b/core/src/main/java/com/netflix/iceberg/avro/Avro.java
index d58bfbd..b08b5ff 100644
--- a/core/src/main/java/com/netflix/iceberg/avro/Avro.java
+++ b/core/src/main/java/com/netflix/iceberg/avro/Avro.java
@@ -22,7 +22,6 @@ package com.netflix.iceberg.avro;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.netflix.iceberg.SchemaParser;
-import com.netflix.iceberg.io.CloseableIterable;
import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
import org.apache.avro.Conversions;
@@ -118,6 +117,11 @@ public class Avro {
return this;
}
+ public WriteBuilder meta(Map<String, String> properties) {
+ metadata.putAll(properties);
+ return this;
+ }
+
private CodecFactory codec() {
String codec = config.getOrDefault(AVRO_COMPRESSION, AVRO_COMPRESSION_DEFAULT);
try {
diff --git a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java
index 9edea0d..2cf23ce 100644
--- a/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java
+++ b/core/src/main/java/com/netflix/iceberg/hadoop/HadoopOutputFile.java
@@ -21,6 +21,7 @@ package com.netflix.iceberg.hadoop;
import com.netflix.iceberg.exceptions.AlreadyExistsException;
import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.InputFile;
import com.netflix.iceberg.io.OutputFile;
import com.netflix.iceberg.io.PositionOutputStream;
import org.apache.hadoop.conf.Configuration;
@@ -81,6 +82,11 @@ public class HadoopOutputFile implements OutputFile {
}
@Override
+ public InputFile toInputFile() {
+ return HadoopInputFile.fromPath(path, conf);
+ }
+
+ @Override
public String toString() {
return location();
}
diff --git a/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
new file mode 100644
index 0000000..27a01fc
--- /dev/null
+++ b/core/src/test/java/com/netflix/iceberg/LocalTableOperations.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.netflix.iceberg;
+
+import com.netflix.iceberg.exceptions.RuntimeIOException;
+import com.netflix.iceberg.io.InputFile;
+import com.netflix.iceberg.io.OutputFile;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+
+import static com.netflix.iceberg.Files.localInput;
+
+class LocalTableOperations implements TableOperations {
+ private final TemporaryFolder temp;
+
+ LocalTableOperations(TemporaryFolder temp) {
+ this.temp = temp;
+ }
+
+ @Override
+ public TableMetadata current() {
+ throw new UnsupportedOperationException("Not implemented for tests");
+ }
+
+ @Override
+ public TableMetadata refresh() {
+ throw new UnsupportedOperationException("Not implemented for tests");
+ }
+
+ @Override
+ public void commit(TableMetadata base, TableMetadata metadata) {
+ throw new UnsupportedOperationException("Not implemented for tests");
+ }
+
+ @Override
+ public InputFile newInputFile(String path) {
+ return localInput(path);
+ }
+
+ @Override
+ public OutputFile newMetadataFile(String filename) {
+ try {
+ File metadataFile = temp.newFile(filename);
+ metadataFile.delete();
+ metadataFile.deleteOnExit();
+ return Files.localOutput(metadataFile);
+ } catch (IOException e) {
+ throw new RuntimeIOException(e);
+ }
+ }
+
+ @Override
+ public void deleteFile(String path) {
+ new File(path).delete();
+ }
+
+ @Override
+ public long newSnapshotId() {
+ throw new UnsupportedOperationException("Not implemented for tests");
+ }
+}
diff --git a/core/src/test/java/com/netflix/iceberg/TableTestBase.java b/core/src/test/java/com/netflix/iceberg/TableTestBase.java
index cafb440..c723daa 100644
--- a/core/src/test/java/com/netflix/iceberg/TableTestBase.java
+++ b/core/src/test/java/com/netflix/iceberg/TableTestBase.java
@@ -120,23 +120,23 @@ public class TableTestBase {
}
void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) {
- List<String> oldManifests = old != null ? old.manifests() : ImmutableList.of();
+ List<ManifestFile> oldManifests = old != null ? old.manifests() : ImmutableList.of();
// copy the manifests to a modifiable list and remove the existing manifests
- List<String> newManifests = Lists.newArrayList(snap.manifests());
- for (String oldManifest : oldManifests) {
+ List<ManifestFile> newManifests = Lists.newArrayList(snap.manifests());
+ for (ManifestFile oldManifest : oldManifests) {
Assert.assertTrue("New snapshot should contain old manifests",
newManifests.remove(oldManifest));
}
Assert.assertEquals("Should create 1 new manifest and reuse old manifests",
1, newManifests.size());
- String manifest = newManifests.get(0);
+ ManifestFile manifest = newManifests.get(0);
long id = snap.snapshotId();
Iterator<String> newPaths = paths(newFiles).iterator();
- for (ManifestEntry entry : ManifestReader.read(localInput(manifest)).entries()) {
+ for (ManifestEntry entry : ManifestReader.read(localInput(manifest.path())).entries()) {
DataFile file = entry.file();
Assert.assertEquals("Path should match expected", newPaths.next(), file.path().toString());
Assert.assertEquals("File's snapshot ID should match", id, entry.snapshotId());
@@ -153,9 +153,15 @@ public class TableTestBase {
return paths;
}
+ static void validateManifest(ManifestFile manifest,
+ Iterator<Long> ids,
+ Iterator<DataFile> expectedFiles) {
+ validateManifest(manifest.path(), ids, expectedFiles);
+ }
+
static void validateManifest(String manifest,
- Iterator<Long> ids,
- Iterator<DataFile> expectedFiles) {
+ Iterator<Long> ids,
+ Iterator<DataFile> expectedFiles) {
for (ManifestEntry entry : ManifestReader.read(localInput(manifest)).entries()) {
DataFile file = entry.file();
DataFile expected = expectedFiles.next();
@@ -168,6 +174,13 @@ public class TableTestBase {
Assert.assertFalse("Should find all files in the manifest", expectedFiles.hasNext());
}
+ static void validateManifestEntries(ManifestFile manifest,
+ Iterator<Long> ids,
+ Iterator<DataFile> expectedFiles,
+ Iterator<ManifestEntry.Status> expectedStatuses) {
+ validateManifestEntries(manifest.path(), ids, expectedFiles, expectedStatuses);
+ }
+
static void validateManifestEntries(String manifest,
Iterator<Long> ids,
Iterator<DataFile> expectedFiles,
@@ -199,7 +212,7 @@ public class TableTestBase {
return Iterators.forArray(files);
}
- static Iterator<DataFile> files(String manifest) {
- return ManifestReader.read(localInput(manifest)).iterator();
+ static Iterator<DataFile> files(ManifestFile manifest) {
+ return ManifestReader.read(localInput(manifest.path())).iterator();
}
}
diff --git a/core/src/test/java/com/netflix/iceberg/TestFastAppend.java b/core/src/test/java/com/netflix/iceberg/TestFastAppend.java
index 9cac989..4d9e174 100644
--- a/core/src/test/java/com/netflix/iceberg/TestFastAppend.java
+++ b/core/src/test/java/com/netflix/iceberg/TestFastAppend.java
@@ -54,7 +54,7 @@ public class TestFastAppend extends TableTestBase {
TableMetadata base = readMetadata();
Assert.assertNotNull("Should have a current snapshot", base.currentSnapshot());
- List<String> v2manifests = base.currentSnapshot().manifests();
+ List<ManifestFile> v2manifests = base.currentSnapshot().manifests();
Assert.assertEquals("Should have one existing manifest", 1, v2manifests.size());
// prepare a new append
@@ -80,7 +80,7 @@ public class TestFastAppend extends TableTestBase {
TableMetadata base = readMetadata();
Assert.assertNotNull("Should have a current snapshot", base.currentSnapshot());
- List<String> v3manifests = base.currentSnapshot().manifests();
+ List<ManifestFile> v3manifests = base.currentSnapshot().manifests();
Assert.assertEquals("Should have 2 existing manifests", 2, v3manifests.size());
// prepare a new append
@@ -110,7 +110,7 @@ public class TestFastAppend extends TableTestBase {
TableMetadata base = readMetadata();
Assert.assertNotNull("Should have a current snapshot", base.currentSnapshot());
- List<String> v2manifests = base.currentSnapshot().manifests();
+ List<ManifestFile> v2manifests = base.currentSnapshot().manifests();
Assert.assertEquals("Should have 1 existing manifest", 1, v2manifests.size());
// commit from the stale table
@@ -137,7 +137,7 @@ public class TestFastAppend extends TableTestBase {
TableMetadata base = readMetadata();
Assert.assertNotNull("Should have a current snapshot", base.currentSnapshot());
- List<String> v2manifests = base.currentSnapshot().manifests();
+ List<ManifestFile> v2manifests = base.currentSnapshot().manifests();
Assert.assertEquals("Should have 1 existing manifest", 1, v2manifests.size());
append.commit();
@@ -147,7 +147,7 @@ public class TestFastAppend extends TableTestBase {
// apply was called before the conflicting commit, but the commit was still consistent
validateSnapshot(base.currentSnapshot(), committed.currentSnapshot(), FILE_D);
- List<String> committedManifests = Lists.newArrayList(committed.currentSnapshot().manifests());
+ List<ManifestFile> committedManifests = Lists.newArrayList(committed.currentSnapshot().manifests());
committedManifests.removeAll(base.currentSnapshot().manifests());
Assert.assertEquals("Should reused manifest created by apply",
pending.manifests().get(0), committedManifests.get(0));
@@ -161,13 +161,13 @@ public class TestFastAppend extends TableTestBase {
AppendFiles append = table.newFastAppend().appendFile(FILE_B);
Snapshot pending = append.apply();
- String newManifest = pending.manifests().get(0);
- Assert.assertTrue("Should create new manifest", new File(newManifest).exists());
+ ManifestFile newManifest = pending.manifests().get(0);
+ Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
AssertHelpers.assertThrows("Should retry 4 times and throw last failure",
CommitFailedException.class, "Injected failure", append::commit);
- Assert.assertFalse("Should clean up new manifest", new File(newManifest).exists());
+ Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists());
}
@Test
@@ -178,15 +178,15 @@ public class TestFastAppend extends TableTestBase {
AppendFiles append = table.newFastAppend().appendFile(FILE_B);
Snapshot pending = append.apply();
- String newManifest = pending.manifests().get(0);
- Assert.assertTrue("Should create new manifest", new File(newManifest).exists());
+ ManifestFile newManifest = pending.manifests().get(0);
+ Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
append.commit();
TableMetadata metadata = readMetadata();
validateSnapshot(null, metadata.currentSnapshot(), FILE_B);
- Assert.assertTrue("Should commit same new manifest", new File(newManifest).exists());
+ Assert.assertTrue("Should commit same new manifest", new File(newManifest.path()).exists());
Assert.assertTrue("Should commit the same new manifest",
metadata.currentSnapshot().manifests().contains(newManifest));
}
diff --git a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
index a1e28bc..6b78c63 100644
--- a/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/com/netflix/iceberg/TestMergeAppend.java
@@ -67,7 +67,7 @@ public class TestMergeAppend extends TableTestBase {
long baseId = base.currentSnapshot().snapshotId();
Assert.assertEquals("Should create 1 manifest for initial write",
1, base.currentSnapshot().manifests().size());
- String initialManifest = base.currentSnapshot().manifests().get(0);
+ ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
Snapshot pending = table.newAppend()
.appendFile(FILE_C)
@@ -76,7 +76,7 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals("Should contain 1 merged manifest for second write",
1, pending.manifests().size());
- String newManifest = pending.manifests().get(0);
+ ManifestFile newManifest = pending.manifests().get(0);
Assert.assertNotEquals("Should not contain manifest from initial write",
initialManifest, newManifest);
@@ -103,7 +103,7 @@ public class TestMergeAppend extends TableTestBase {
long baseId = base.currentSnapshot().snapshotId();
Assert.assertEquals("Should create 1 manifest for initial write",
1, base.currentSnapshot().manifests().size());
- String initialManifest = base.currentSnapshot().manifests().get(0);
+ ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
table.newDelete()
.deleteFile(FILE_A)
@@ -113,7 +113,7 @@ public class TestMergeAppend extends TableTestBase {
long deleteId = delete.currentSnapshot().snapshotId();
Assert.assertEquals("Should create 1 filtered manifest for delete",
1, delete.currentSnapshot().manifests().size());
- String deleteManifest = delete.currentSnapshot().manifests().get(0);
+ ManifestFile deleteManifest = delete.currentSnapshot().manifests().get(0);
validateManifestEntries(deleteManifest,
ids(deleteId, baseId),
@@ -127,7 +127,7 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals("Should contain 1 merged manifest for second write",
1, pending.manifests().size());
- String newManifest = pending.manifests().get(0);
+ ManifestFile newManifest = pending.manifests().get(0);
Assert.assertNotEquals("Should not contain manifest from initial write",
initialManifest, newManifest);
@@ -168,7 +168,7 @@ public class TestMergeAppend extends TableTestBase {
TableMetadata base = readMetadata();
Assert.assertEquals("Should have 3 unmerged manifests",
3, base.currentSnapshot().manifests().size());
- Set<String> unmerged = Sets.newHashSet(base.currentSnapshot().manifests());
+ Set<ManifestFile> unmerged = Sets.newHashSet(base.currentSnapshot().manifests());
Snapshot pending = table.newAppend()
.appendFile(FILE_D)
@@ -176,7 +176,7 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals("Should contain 1 merged manifest after the 4th write",
1, pending.manifests().size());
- String newManifest = pending.manifests().get(0);
+ ManifestFile newManifest = pending.manifests().get(0);
Assert.assertFalse("Should not contain previous manifests", unmerged.contains(newManifest));
long pendingId = pending.snapshotId();
@@ -204,7 +204,7 @@ public class TestMergeAppend extends TableTestBase {
long baseId = base.currentSnapshot().snapshotId();
Assert.assertEquals("Should create 1 manifest for initial write",
1, base.currentSnapshot().manifests().size());
- String initialManifest = base.currentSnapshot().manifests().get(0);
+ ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
Snapshot pending = table.newAppend()
.appendFile(FILE_C)
@@ -213,7 +213,7 @@ public class TestMergeAppend extends TableTestBase {
Assert.assertEquals("Should contain 2 unmerged manifests after second write",
2, pending.manifests().size());
- String newManifest = pending.manifests().get(0);
+ ManifestFile newManifest = pending.manifests().get(0);
Assert.assertNotEquals("Should not contain manifest from initial write",
initialManifest, newManifest);
@@ -233,7 +233,7 @@ public class TestMergeAppend extends TableTestBase {
TableMetadata base = readMetadata();
Assert.assertEquals("Should create 1 manifest for initial write",
1, base.currentSnapshot().manifests().size());
- String initialManifest = base.currentSnapshot().manifests().get(0);
+ ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
// build the new spec using the table's schema, which uses fresh IDs
PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
@@ -279,7 +279,7 @@ public class TestMergeAppend extends TableTestBase {
TableMetadata base = readMetadata();
Assert.assertEquals("Should contain 2 manifests",
2, base.currentSnapshot().manifests().size());
- String manifest = base.currentSnapshot().manifests().get(0);
+ ManifestFile manifest = base.currentSnapshot().manifests().get(0);
// build the new spec using the table's schema, which uses fresh IDs
PartitionSpec newSpec = PartitionSpec.builderFor(base.schema())
@@ -319,7 +319,7 @@ public class TestMergeAppend extends TableTestBase {
TableMetadata base = readMetadata();
long baseId = base.currentSnapshot().snapshotId();
- String initialManifest = base.currentSnapshot().manifests().get(0);
+ ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
table.ops().failCommits(5);
@@ -327,9 +327,9 @@ public class TestMergeAppend extends TableTestBase {
Snapshot pending = append.apply();
Assert.assertEquals("Should merge to 1 manifest", 1, pending.manifests().size());
- String newManifest = pending.manifests().get(0);
+ ManifestFile newManifest = pending.manifests().get(0);
- Assert.assertTrue("Should create new manifest", new File(newManifest).exists());
+ Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
validateManifest(newManifest,
ids(pending.snapshotId(), baseId),
concat(files(FILE_B), files(initialManifest)));
@@ -337,7 +337,7 @@ public class TestMergeAppend extends TableTestBase {
AssertHelpers.assertThrows("Should retry 4 times and throw last failure",
CommitFailedException.class, "Injected failure", append::commit);
- Assert.assertFalse("Should clean up new manifest", new File(newManifest).exists());
+ Assert.assertFalse("Should clean up new manifest", new File(newManifest.path()).exists());
}
@Test
@@ -351,7 +351,7 @@ public class TestMergeAppend extends TableTestBase {
TableMetadata base = readMetadata();
long baseId = base.currentSnapshot().snapshotId();
- String initialManifest = base.currentSnapshot().manifests().get(0);
+ ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
table.ops().failCommits(3);
@@ -359,9 +359,9 @@ public class TestMergeAppend extends TableTestBase {
Snapshot pending = append.apply();
Assert.assertEquals("Should merge to 1 manifest", 1, pending.manifests().size());
- String newManifest = pending.manifests().get(0);
+ ManifestFile newManifest = pending.manifests().get(0);
- Assert.assertTrue("Should create new manifest", new File(newManifest).exists());
+ Assert.assertTrue("Should create new manifest", new File(newManifest.path()).exists());
validateManifest(newManifest,
ids(pending.snapshotId(), baseId),
concat(files(FILE_B), files(initialManifest)));
@@ -369,7 +369,7 @@ public class TestMergeAppend extends TableTestBase {
append.commit();
TableMetadata metadata = readMetadata();
- Assert.assertTrue("Should reuse the new manifest", new File(newManifest).exists());
+ Assert.assertTrue("Should reuse the new manifest", new File(newManifest.path()).exists());
Assert.assertEquals("Should commit the same new manifest during retry",
Lists.newArrayList(newManifest), metadata.currentSnapshot().manifests());
}
diff --git a/core/src/test/java/com/netflix/iceberg/TestReplaceFiles.java b/core/src/test/java/com/netflix/iceberg/TestReplaceFiles.java
index d667294..032b680 100644
--- a/core/src/test/java/com/netflix/iceberg/TestReplaceFiles.java
+++ b/core/src/test/java/com/netflix/iceberg/TestReplaceFiles.java
@@ -87,7 +87,7 @@ public class TestReplaceFiles extends TableTestBase {
long baseSnapshotId = base.currentSnapshot().snapshotId();
Assert.assertEquals("Should create 1 manifest for initial write",
1, base.currentSnapshot().manifests().size());
- String initialManifest = base.currentSnapshot().manifests().get(0);
+ ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
Snapshot pending = table.newRewrite()
.rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_C))
@@ -127,7 +127,7 @@ public class TestReplaceFiles extends TableTestBase {
long baseSnapshotId = base.currentSnapshot().snapshotId();
Assert.assertEquals("Should create 1 manifest for initial write",
1, base.currentSnapshot().manifests().size());
- String initialManifest = base.currentSnapshot().manifests().get(0);
+ ManifestFile initialManifest = base.currentSnapshot().manifests().get(0);
Snapshot pending = table.newRewrite()
.rewriteFiles(Sets.newSet(FILE_A), Sets.newSet(FILE_C))
@@ -167,8 +167,8 @@ public class TestReplaceFiles extends TableTestBase {
Snapshot pending = rewrite.apply();
Assert.assertEquals("Should produce 2 manifests", 2, pending.manifests().size());
- String manifest1 = pending.manifests().get(0);
- String manifest2 = pending.manifests().get(1);
+ ManifestFile manifest1 = pending.manifests().get(0);
+ ManifestFile manifest2 = pending.manifests().get(1);
validateManifestEntries(manifest1,
ids(pending.snapshotId()), files(FILE_B), statuses(ADDED));
@@ -178,8 +178,8 @@ public class TestReplaceFiles extends TableTestBase {
AssertHelpers.assertThrows("Should retry 4 times and throw last failure",
CommitFailedException.class, "Injected failure", rewrite::commit);
- Assert.assertFalse("Should clean up new manifest", new File(manifest1).exists());
- Assert.assertFalse("Should clean up new manifest", new File(manifest2).exists());
+ Assert.assertFalse("Should clean up new manifest", new File(manifest1.path()).exists());
+ Assert.assertFalse("Should clean up new manifest", new File(manifest2.path()).exists());
// As commit failed all the manifests added with rewrite should be cleaned up
Assert.assertEquals("Only 1 manifest should exist", 1, listMetadataFiles("avro").size());
@@ -197,8 +197,8 @@ public class TestReplaceFiles extends TableTestBase {
Snapshot pending = rewrite.apply();
Assert.assertEquals("Should produce 2 manifests", 2, pending.manifests().size());
- String manifest1 = pending.manifests().get(0);
- String manifest2 = pending.manifests().get(1);
+ ManifestFile manifest1 = pending.manifests().get(0);
+ ManifestFile manifest2 = pending.manifests().get(1);
validateManifestEntries(manifest1,
ids(pending.snapshotId()), files(FILE_B), statuses(ADDED));
@@ -207,8 +207,8 @@ public class TestReplaceFiles extends TableTestBase {
rewrite.commit();
- Assert.assertTrue("Should reuse the manifest for appends", new File(manifest1).exists());
- Assert.assertTrue("Should reuse the manifest with deletes", new File(manifest2).exists());
+ Assert.assertTrue("Should reuse the manifest for appends", new File(manifest1.path()).exists());
+ Assert.assertTrue("Should reuse the manifest with deletes", new File(manifest2.path()).exists());
TableMetadata metadata = readMetadata();
Assert.assertTrue("Should commit the manifest for append",
diff --git a/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java b/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
index 4c55c67..dbcc811 100644
--- a/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
+++ b/core/src/test/java/com/netflix/iceberg/TestSnapshotJson.java
@@ -19,19 +19,72 @@
package com.netflix.iceberg;
+import com.google.common.collect.ImmutableList;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import static com.netflix.iceberg.Files.localInput;
public class TestSnapshotJson {
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ public TableOperations ops = new LocalTableOperations(temp);
+
@Test
public void testJsonConversion() {
- Snapshot expected = new BaseSnapshot(null, System.currentTimeMillis(),
+ Snapshot expected = new BaseSnapshot(ops, System.currentTimeMillis(),
"file:/tmp/manifest1.avro", "file:/tmp/manifest2.avro");
String json = SnapshotParser.toJson(expected);
- Snapshot snapshot = SnapshotParser.fromJson(null, json);
+ Snapshot snapshot = SnapshotParser.fromJson(ops, json);
+
+ Assert.assertEquals("Snapshot ID should match",
+ expected.snapshotId(), snapshot.snapshotId());
+ Assert.assertEquals("Files should match",
+ expected.manifests(), snapshot.manifests());
+ }
+
+ @Test
+ public void testJsonConversionWithManifestList() throws IOException {
+ long parentId = 1;
+ long id = 2;
+ List<ManifestFile> manifests = ImmutableList.of(
+ new GenericManifestFile(localInput("file:/tmp/manifest1.avro"), 0),
+ new GenericManifestFile(localInput("file:/tmp/manifest2.avro"), 0));
+
+ File manifestList = temp.newFile("manifests");
+ Assert.assertTrue(manifestList.delete());
+ manifestList.deleteOnExit();
+
+ try (ManifestListWriter writer = new ManifestListWriter(
+ Files.localOutput(manifestList), id, parentId)) {
+ writer.addAll(manifests);
+ }
+
+ Snapshot expected = new BaseSnapshot(
+ ops, id, parentId, System.currentTimeMillis(), localInput(manifestList));
+ Snapshot inMemory = new BaseSnapshot(
+ ops, id, parentId, expected.timestampMillis(), manifests);
+
+ Assert.assertEquals("Files should match in memory list",
+ inMemory.manifests(), expected.manifests());
+
+ String json = SnapshotParser.toJson(expected);
+ Snapshot snapshot = SnapshotParser.fromJson(ops, json);
Assert.assertEquals("Snapshot ID should match",
expected.snapshotId(), snapshot.snapshotId());
+ Assert.assertEquals("Timestamp should match",
+ expected.timestampMillis(), snapshot.timestampMillis());
+ Assert.assertEquals("Parent ID should match",
+ expected.parentId(), snapshot.parentId());
+ Assert.assertEquals("Manifest list should match",
+ expected.manifestListLocation(), snapshot.manifestListLocation());
Assert.assertEquals("Files should match",
expected.manifests(), snapshot.manifests());
}
diff --git a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
index 21acdbd..0b04fac 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTableMetadataJson.java
@@ -29,7 +29,9 @@ import com.netflix.iceberg.exceptions.RuntimeIOException;
import com.netflix.iceberg.types.Types;
import com.netflix.iceberg.util.JsonUtil;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.io.StringWriter;
import java.util.Arrays;
@@ -37,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
+import static com.netflix.iceberg.Files.localInput;
import static com.netflix.iceberg.TableMetadataParser.CURRENT_SNAPSHOT_ID;
import static com.netflix.iceberg.TableMetadataParser.FORMAT_VERSION;
import static com.netflix.iceberg.TableMetadataParser.LAST_COLUMN_ID;
@@ -48,6 +51,11 @@ import static com.netflix.iceberg.TableMetadataParser.SCHEMA;
import static com.netflix.iceberg.TableMetadataParser.SNAPSHOTS;
public class TestTableMetadataJson {
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ public TableOperations ops = new LocalTableOperations(temp);
+
@Test
public void testJsonConversion() throws Exception {
Schema schema = new Schema(
@@ -60,23 +68,25 @@ public class TestTableMetadataJson {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
- null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro"));
+ null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+ new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
- null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro"));
+ null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+ new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
List<SnapshotLogEntry> snapshotLog = ImmutableList.<SnapshotLogEntry>builder()
.add(new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId()))
.add(new SnapshotLogEntry(currentSnapshot.timestampMillis(), currentSnapshot.snapshotId()))
.build();
- TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
+ TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), snapshotLog);
String asJson = TableMetadataParser.toJson(expected);
- TableMetadata metadata = TableMetadataParser.fromJson(null, null,
+ TableMetadata metadata = TableMetadataParser.fromJson(ops, null,
JsonUtil.mapper().readValue(asJson, JsonNode.class));
Assert.assertEquals("Table location should match",
@@ -120,14 +130,16 @@ public class TestTableMetadataJson {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
- null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro"));
+ ops, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+ new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
- null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro"));
+ ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+ new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
List<SnapshotLogEntry> reversedSnapshotLog = Lists.newArrayList();
- TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
+ TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
System.currentTimeMillis(), 3, schema, 5, ImmutableList.of(spec),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), reversedSnapshotLog);
@@ -139,7 +151,7 @@ public class TestTableMetadataJson {
new SnapshotLogEntry(previousSnapshot.timestampMillis(), previousSnapshot.snapshotId()));
String asJson = TableMetadataParser.toJson(expected);
- TableMetadata metadata = TableMetadataParser.fromJson(null, null,
+ TableMetadata metadata = TableMetadataParser.fromJson(ops, null,
JsonUtil.mapper().readValue(asJson, JsonNode.class));
List<SnapshotLogEntry> expectedSnapshotLog = ImmutableList.<SnapshotLogEntry>builder()
@@ -163,18 +175,20 @@ public class TestTableMetadataJson {
long previousSnapshotId = System.currentTimeMillis() - new Random(1234).nextInt(3600);
Snapshot previousSnapshot = new BaseSnapshot(
- null, previousSnapshotId, null, previousSnapshotId, ImmutableList.of("file:/tmp/manfiest.1.avro"));
+ ops, previousSnapshotId, null, previousSnapshotId, ImmutableList.of(
+ new GenericManifestFile(localInput("file:/tmp/manfiest.1.avro"), spec.specId())));
long currentSnapshotId = System.currentTimeMillis();
Snapshot currentSnapshot = new BaseSnapshot(
- null, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of("file:/tmp/manfiest.2.avro"));
+ ops, currentSnapshotId, previousSnapshotId, currentSnapshotId, ImmutableList.of(
+ new GenericManifestFile(localInput("file:/tmp/manfiest.2.avro"), spec.specId())));
- TableMetadata expected = new TableMetadata(null, null, "s3://bucket/test/location",
+ TableMetadata expected = new TableMetadata(ops, null, "s3://bucket/test/location",
System.currentTimeMillis(), 3, schema, 6, ImmutableList.of(spec),
ImmutableMap.of("property", "value"), currentSnapshotId,
Arrays.asList(previousSnapshot, currentSnapshot), ImmutableList.of());
String asJson = toJsonWithoutSpecList(expected);
- TableMetadata metadata = TableMetadataParser.fromJson(null, null,
+ TableMetadata metadata = TableMetadataParser.fromJson(ops, null,
JsonUtil.mapper().readValue(asJson, JsonNode.class));
Assert.assertEquals("Table location should match",
diff --git a/core/src/test/java/com/netflix/iceberg/TestTransaction.java b/core/src/test/java/com/netflix/iceberg/TestTransaction.java
index 317ad5c..bdc3ddd 100644
--- a/core/src/test/java/com/netflix/iceberg/TestTransaction.java
+++ b/core/src/test/java/com/netflix/iceberg/TestTransaction.java
@@ -274,7 +274,7 @@ public class TestTransaction extends TableTestBase {
.appendFile(FILE_B)
.commit();
- Set<String> appendManifests = Sets.newHashSet(t.table().currentSnapshot().manifests());
+ Set<ManifestFile> appendManifests = Sets.newHashSet(t.table().currentSnapshot().manifests());
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
@@ -313,7 +313,7 @@ public class TestTransaction extends TableTestBase {
.appendFile(FILE_B)
.commit();
- Set<String> appendManifests = Sets.newHashSet(t.table().currentSnapshot().manifests());
+ Set<ManifestFile> appendManifests = Sets.newHashSet(t.table().currentSnapshot().manifests());
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
@@ -327,13 +327,13 @@ public class TestTransaction extends TableTestBase {
Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
- Set<String> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
+ Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
t.commitTransaction();
Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
- Set<String> expectedManifests = Sets.newHashSet();
+ Set<ManifestFile> expectedManifests = Sets.newHashSet();
expectedManifests.addAll(appendManifests);
expectedManifests.addAll(conflictAppendManifests);
@@ -370,7 +370,7 @@ public class TestTransaction extends TableTestBase {
Assert.assertEquals("Append should create one manifest",
1, t.table().currentSnapshot().manifests().size());
- String appendManifest = t.table().currentSnapshot().manifests().get(0);
+ ManifestFile appendManifest = t.table().currentSnapshot().manifests().get(0);
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
@@ -384,13 +384,13 @@ public class TestTransaction extends TableTestBase {
Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
- Set<String> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
+ Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
t.commitTransaction();
Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
- Set<String> previousManifests = Sets.newHashSet();
+ Set<ManifestFile> previousManifests = Sets.newHashSet();
previousManifests.add(appendManifest);
previousManifests.addAll(conflictAppendManifests);
@@ -399,7 +399,7 @@ public class TestTransaction extends TableTestBase {
Assert.assertFalse("Should merge both commit manifests into a new manifest",
previousManifests.contains(table.currentSnapshot().manifests().get(0)));
- Assert.assertFalse("Append manifest should be deleted", new File(appendManifest).exists());
+ Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
}
@Test
@@ -427,7 +427,7 @@ public class TestTransaction extends TableTestBase {
Assert.assertEquals("Append should create one manifest",
1, t.table().currentSnapshot().manifests().size());
- String appendManifest = t.table().currentSnapshot().manifests().get(0);
+ ManifestFile appendManifest = t.table().currentSnapshot().manifests().get(0);
Assert.assertSame("Base metadata should not change when commit is created",
base, readMetadata());
@@ -441,13 +441,13 @@ public class TestTransaction extends TableTestBase {
Assert.assertEquals("Table should be on version 2 after real append", 2, (int) version());
- Set<String> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
+ Set<ManifestFile> conflictAppendManifests = Sets.newHashSet(table.currentSnapshot().manifests());
t.commitTransaction();
Assert.assertEquals("Table should be on version 3 after commit", 3, (int) version());
- Set<String> previousManifests = Sets.newHashSet();
+ Set<ManifestFile> previousManifests = Sets.newHashSet();
previousManifests.add(appendManifest);
previousManifests.addAll(conflictAppendManifests);
@@ -456,6 +456,6 @@ public class TestTransaction extends TableTestBase {
Assert.assertFalse("Should merge both commit manifests into a new manifest",
previousManifests.contains(table.currentSnapshot().manifests().get(0)));
- Assert.assertFalse("Append manifest should be deleted", new File(appendManifest).exists());
+ Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists());
}
}