You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/05/29 20:13:48 UTC
[iceberg] branch master updated: Add DeleteFile and manifest reader
and writer for deletes (#1064)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 527240b Add DeleteFile and manifest reader and writer for deletes (#1064)
527240b is described below
commit 527240b445b23cef1a655eccbb3b2c0eb7d178c1
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Fri May 29 12:26:56 2020 -0700
Add DeleteFile and manifest reader and writer for deletes (#1064)
---
.../main/java/org/apache/iceberg/ContentFile.java | 116 +++++++
api/src/main/java/org/apache/iceberg/DataFile.java | 87 +----
.../main/java/org/apache/iceberg/DeleteFile.java | 17 +-
.../expressions/InclusiveMetricsEvaluator.java | 7 +-
.../{GenericDataFile.java => BaseFile.java} | 272 +++++++--------
.../org/apache/iceberg/BaseManifestReader.java | 25 +-
.../org/apache/iceberg/BaseMetastoreCatalog.java | 2 +-
.../org/apache/iceberg/BaseRewriteManifests.java | 6 +-
.../main/java/org/apache/iceberg/BaseSnapshot.java | 4 +-
...ableMetadata.java => DeleteManifestReader.java} | 20 +-
.../main/java/org/apache/iceberg/FileHistory.java | 8 +-
.../main/java/org/apache/iceberg/FindFiles.java | 2 +-
.../java/org/apache/iceberg/GenericDataFile.java | 375 +--------------------
.../java/org/apache/iceberg/GenericDeleteFile.java | 76 +++++
.../org/apache/iceberg/GenericManifestEntry.java | 36 +-
.../org/apache/iceberg/InheritableMetadata.java | 2 +-
.../apache/iceberg/InheritableMetadataFactory.java | 6 +-
.../org/apache/iceberg/ManifestEntriesTable.java | 2 +-
.../java/org/apache/iceberg/ManifestEntry.java | 8 +-
.../java/org/apache/iceberg/ManifestFiles.java | 48 ++-
.../java/org/apache/iceberg/ManifestGroup.java | 10 +-
.../java/org/apache/iceberg/ManifestWriter.java | 117 ++++---
.../apache/iceberg/MergingSnapshotProducer.java | 8 +-
.../java/org/apache/iceberg/RemoveSnapshots.java | 4 +-
.../main/java/org/apache/iceberg/ScanSummary.java | 6 +-
.../java/org/apache/iceberg/SnapshotProducer.java | 4 +-
.../main/java/org/apache/iceberg/V1Metadata.java | 15 +-
.../main/java/org/apache/iceberg/V2Metadata.java | 42 ++-
.../java/org/apache/iceberg/TableTestBase.java | 24 +-
.../org/apache/iceberg/TestManifestReader.java | 6 +-
.../apache/iceberg/TestManifestWriterVersions.java | 91 ++++-
.../java/org/apache/iceberg/TestMergeAppend.java | 2 +-
.../org/apache/iceberg/TestRewriteManifests.java | 6 +-
33 files changed, 668 insertions(+), 786 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/ContentFile.java b/api/src/main/java/org/apache/iceberg/ContentFile.java
new file mode 100644
index 0000000..262ea17
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/ContentFile.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Superinterface of {@link DataFile} and {@link DeleteFile} that exposes common methods.
+ *
+ * @param <F> the concrete Java class of a ContentFile instance.
+ */
+public interface ContentFile<F> {
+ /**
+ * @return type of content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES
+ */
+ FileContent content();
+
+ /**
+ * @return fully qualified path to the file, suitable for constructing a Hadoop Path
+ */
+ CharSequence path();
+
+ /**
+ * @return format of the file
+ */
+ FileFormat format();
+
+ /**
+ * @return partition for this file as a {@link StructLike}
+ */
+ StructLike partition();
+
+ /**
+ * @return the number of top-level records in the file
+ */
+ long recordCount();
+
+ /**
+ * @return the file size in bytes
+ */
+ long fileSizeInBytes();
+
+ /**
+ * @return if collected, map from column ID to the size of the column in bytes, null otherwise
+ */
+ Map<Integer, Long> columnSizes();
+
+ /**
+ * @return if collected, map from column ID to the count of its non-null values, null otherwise
+ */
+ Map<Integer, Long> valueCounts();
+
+ /**
+ * @return if collected, map from column ID to its null value count, null otherwise
+ */
+ Map<Integer, Long> nullValueCounts();
+
+ /**
+ * @return if collected, map from column ID to value lower bounds, null otherwise
+ */
+ Map<Integer, ByteBuffer> lowerBounds();
+
+ /**
+ * @return if collected, map from column ID to value upper bounds, null otherwise
+ */
+ Map<Integer, ByteBuffer> upperBounds();
+
+ /**
+ * @return metadata about how this file is encrypted, or null if the file is stored in plain
+ * text.
+ */
+ ByteBuffer keyMetadata();
+
+ /**
+ * @return List of recommended split locations, if applicable, null otherwise.
+ * When available, this information is used for planning scan tasks whose boundaries
+ * are determined by these offsets. The returned list must be sorted in ascending order.
+ */
+ List<Long> splitOffsets();
+
+
+ /**
+ * Copies this file. Manifest readers can reuse file instances; use
+ * this method to copy data when collecting files from tasks.
+ *
+ * @return a copy of this data file
+ */
+ F copy();
+
+ /**
+ * Copies this file without file stats. Manifest readers can reuse file instances; use
+ * this method to copy data without stats when collecting files.
+ *
+ * @return a copy of this data file, without lower bounds, upper bounds, value counts, or null value counts
+ */
+ F copyWithoutStats();
+}
diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java
index d9a4441..747057b 100644
--- a/api/src/main/java/org/apache/iceberg/DataFile.java
+++ b/api/src/main/java/org/apache/iceberg/DataFile.java
@@ -19,9 +19,6 @@
package org.apache.iceberg;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.Map;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.BinaryType;
import org.apache.iceberg.types.Types.IntegerType;
@@ -35,9 +32,9 @@ import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
/**
- * Interface for files listed in a table manifest.
+ * Interface for data files listed in a table manifest.
*/
-public interface DataFile {
+public interface DataFile extends ContentFile<DataFile> {
// fields for adding delete data files
Types.NestedField CONTENT = optional(134, "content", IntegerType.get(),
"Contents of the file: 0=data, 1=position deletes, 2=equality deletes");
@@ -86,86 +83,8 @@ public interface DataFile {
/**
* @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES
*/
+ @Override
default FileContent content() {
return FileContent.DATA;
}
-
- /**
- * @return fully qualified path to the file, suitable for constructing a Hadoop Path
- */
- CharSequence path();
-
- /**
- * @return format of the data file
- */
- FileFormat format();
-
- /**
- * @return partition data for this file as a {@link StructLike}
- */
- StructLike partition();
-
- /**
- * @return the number of top-level records in the data file
- */
- long recordCount();
-
- /**
- * @return the data file size in bytes
- */
- long fileSizeInBytes();
-
- /**
- * @return if collected, map from column ID to the size of the column in bytes, null otherwise
- */
- Map<Integer, Long> columnSizes();
-
- /**
- * @return if collected, map from column ID to the count of its non-null values, null otherwise
- */
- Map<Integer, Long> valueCounts();
-
- /**
- * @return if collected, map from column ID to its null value count, null otherwise
- */
- Map<Integer, Long> nullValueCounts();
-
- /**
- * @return if collected, map from column ID to value lower bounds, null otherwise
- */
- Map<Integer, ByteBuffer> lowerBounds();
-
- /**
- * @return if collected, map from column ID to value upper bounds, null otherwise
- */
- Map<Integer, ByteBuffer> upperBounds();
-
- /**
- * @return metadata about how this file is encrypted, or null if the file is stored in plain
- * text.
- */
- ByteBuffer keyMetadata();
-
- /**
- * @return List of recommended split locations, if applicable, null otherwise.
- * When available, this information is used for planning scan tasks whose boundaries
- * are determined by these offsets. The returned list must be sorted in ascending order.
- */
- List<Long> splitOffsets();
-
- /**
- * Copies this {@link DataFile data file}. Manifest readers can reuse data file instances; use
- * this method to copy data when collecting files from tasks.
- *
- * @return a copy of this data file
- */
- DataFile copy();
-
- /**
- * Copies this {@link DataFile data file} without file stats. Manifest readers can reuse data file instances; use
- * this method to copy data without stats when collecting files.
- *
- * @return a copy of this data file, without lower bounds, upper bounds, value counts, or null value counts
- */
- DataFile copyWithoutStats();
}
diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java b/api/src/main/java/org/apache/iceberg/DeleteFile.java
similarity index 62%
copy from core/src/main/java/org/apache/iceberg/InheritableMetadata.java
copy to api/src/main/java/org/apache/iceberg/DeleteFile.java
index 5eebe8a..9adc0fb 100644
--- a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java
+++ b/api/src/main/java/org/apache/iceberg/DeleteFile.java
@@ -19,8 +19,19 @@
package org.apache.iceberg;
-import java.io.Serializable;
+import java.util.List;
-interface InheritableMetadata extends Serializable {
- ManifestEntry apply(ManifestEntry manifestEntry);
+/**
+ * Interface for delete files listed in a table delete manifest.
+ */
+public interface DeleteFile extends ContentFile<DeleteFile> {
+ /**
+ * @return List of recommended split locations, if applicable, null otherwise.
+ * When available, this information is used for planning scan tasks whose boundaries
+ * are determined by these offsets. The returned list must be sorted in ascending order.
+ */
+ @Override
+ default List<Long> splitOffsets() {
+ return null;
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
index 4069586..148ac4f 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/InclusiveMetricsEvaluator.java
@@ -25,6 +25,7 @@ import java.util.Comparator;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Schema;
import org.apache.iceberg.expressions.ExpressionVisitors.BoundExpressionVisitor;
@@ -40,7 +41,7 @@ import static org.apache.iceberg.expressions.Expressions.rewriteNot;
* <p>
* This evaluation is inclusive: it returns true if a file may match and false if it cannot match.
* <p>
- * Files are passed to {@link #eval(DataFile)}, which returns true if the file may contain matching
+ * Files are passed to {@link #eval(ContentFile)}, which returns true if the file may contain matching
* rows and false if the file cannot contain matching rows. Files may be skipped if and only if the
* return value of {@code eval} is false.
*/
@@ -70,7 +71,7 @@ public class InclusiveMetricsEvaluator {
* @param file a data file
* @return false if the file cannot contain rows that match the expression, true otherwise.
*/
- public boolean eval(DataFile file) {
+ public boolean eval(ContentFile<?> file) {
// TODO: detect the case where a column is missing from the file using file's max field id.
return visitor().eval(file);
}
@@ -84,7 +85,7 @@ public class InclusiveMetricsEvaluator {
private Map<Integer, ByteBuffer> lowerBounds = null;
private Map<Integer, ByteBuffer> upperBounds = null;
- private boolean eval(DataFile file) {
+ private boolean eval(ContentFile<?> file) {
if (file.recordCount() == 0) {
return ROWS_CANNOT_MATCH;
}
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/BaseFile.java
similarity index 70%
copy from core/src/main/java/org/apache/iceberg/GenericDataFile.java
copy to core/src/main/java/org/apache/iceberg/BaseFile.java
index 6c6b1ba..7b03418 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFile.java
@@ -19,28 +19,31 @@
package org.apache.iceberg;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
+import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.specific.SpecificData;
import org.apache.iceberg.avro.AvroSchemaUtil;
-import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
-class GenericDataFile
- implements DataFile, IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable {
- private static final Types.StructType EMPTY_STRUCT_TYPE = Types.StructType.of();
- private static final PartitionData EMPTY_PARTITION_DATA = new PartitionData(EMPTY_STRUCT_TYPE) {
+/**
+ * Base class for both {@link DataFile} and {@link DeleteFile}.
+ */
+abstract class BaseFile<F>
+ implements ContentFile<F>, IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable {
+ static final Types.StructType EMPTY_STRUCT_TYPE = Types.StructType.of();
+ static final PartitionData EMPTY_PARTITION_DATA = new PartitionData(EMPTY_STRUCT_TYPE) {
@Override
public PartitionData copy() {
return this; // this does not change
@@ -50,6 +53,7 @@ class GenericDataFile
private int[] fromProjectionPos;
private Types.StructType partitionType;
+ private FileContent content = FileContent.DATA;
private String filePath = null;
private FileFormat format = null;
private PartitionData partitionData = null;
@@ -66,13 +70,12 @@ class GenericDataFile
private byte[] keyMetadata = null;
// cached schema
- private transient org.apache.avro.Schema avroSchema = null;
+ private transient Schema avroSchema = null;
/**
* Used by Avro reflection to instantiate this class when reading manifest files.
*/
- @SuppressWarnings("checkstyle:RedundantModifier") // Must be public
- public GenericDataFile(org.apache.avro.Schema avroSchema) {
+ BaseFile(Schema avroSchema) {
this.avroSchema = avroSchema;
Types.StructType schema = AvroSchemaUtil.convert(avroSchema).asNestedType().asStructType();
@@ -105,28 +108,12 @@ class GenericDataFile
this.partitionData = new PartitionData(partitionType);
}
- GenericDataFile(String filePath, FileFormat format, long recordCount,
- long fileSizeInBytes) {
- this.filePath = filePath;
- this.format = format;
- this.partitionData = EMPTY_PARTITION_DATA;
- this.partitionType = EMPTY_PARTITION_DATA.getPartitionType();
- this.recordCount = recordCount;
- this.fileSizeInBytes = fileSizeInBytes;
- }
-
- GenericDataFile(String filePath, FileFormat format, PartitionData partition,
- long recordCount, long fileSizeInBytes) {
- this.filePath = filePath;
- this.format = format;
- this.partitionData = partition;
- this.partitionType = partition.getPartitionType();
- this.recordCount = recordCount;
- this.fileSizeInBytes = fileSizeInBytes;
- }
-
- GenericDataFile(String filePath, FileFormat format, PartitionData partition,
- long fileSizeInBytes, Metrics metrics, List<Long> splitOffsets) {
+ BaseFile(FileContent content, String filePath, FileFormat format,
+ PartitionData partition, long fileSizeInBytes, long recordCount,
+ Map<Integer, Long> columnSizes, Map<Integer, Long> valueCounts, Map<Integer, Long> nullValueCounts,
+ Map<Integer, ByteBuffer> lowerBounds, Map<Integer, ByteBuffer> upperBounds, List<Long> splitOffsets,
+ ByteBuffer keyMetadata) {
+ this.content = content;
this.filePath = filePath;
this.format = format;
@@ -140,20 +127,14 @@ class GenericDataFile
}
// this will throw NPE if metrics.recordCount is null
- this.recordCount = metrics.recordCount();
+ this.recordCount = recordCount;
this.fileSizeInBytes = fileSizeInBytes;
- this.columnSizes = metrics.columnSizes();
- this.valueCounts = metrics.valueCounts();
- this.nullValueCounts = metrics.nullValueCounts();
- this.lowerBounds = SerializableByteBufferMap.wrap(metrics.lowerBounds());
- this.upperBounds = SerializableByteBufferMap.wrap(metrics.upperBounds());
+ this.columnSizes = columnSizes;
+ this.valueCounts = valueCounts;
+ this.nullValueCounts = nullValueCounts;
+ this.lowerBounds = SerializableByteBufferMap.wrap(lowerBounds);
+ this.upperBounds = SerializableByteBufferMap.wrap(upperBounds);
this.splitOffsets = copy(splitOffsets);
- }
-
- GenericDataFile(String filePath, FileFormat format, PartitionData partition,
- long fileSizeInBytes, Metrics metrics,
- ByteBuffer keyMetadata, List<Long> splitOffsets) {
- this(filePath, format, partition, fileSizeInBytes, metrics, splitOffsets);
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
}
@@ -163,7 +144,8 @@ class GenericDataFile
* @param toCopy a generic data file to copy.
* @param fullCopy whether to copy all fields or to drop column-level stats
*/
- private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) {
+ BaseFile(BaseFile<F> toCopy, boolean fullCopy) {
+ this.content = toCopy.content;
this.filePath = toCopy.filePath;
this.format = toCopy.format;
this.partitionData = toCopy.partitionData.copy();
@@ -192,76 +174,13 @@ class GenericDataFile
/**
* Constructor for Java serialization.
*/
- GenericDataFile() {
+ BaseFile() {
}
- @Override
- public FileContent content() {
- return FileContent.DATA;
- }
+ protected abstract Schema getAvroSchema(Types.StructType partitionStruct);
@Override
- public CharSequence path() {
- return filePath;
- }
-
- @Override
- public FileFormat format() {
- return format;
- }
-
- @Override
- public StructLike partition() {
- return partitionData;
- }
-
- @Override
- public long recordCount() {
- return recordCount;
- }
-
- @Override
- public long fileSizeInBytes() {
- return fileSizeInBytes;
- }
-
- @Override
- public Map<Integer, Long> columnSizes() {
- return columnSizes;
- }
-
- @Override
- public Map<Integer, Long> valueCounts() {
- return valueCounts;
- }
-
- @Override
- public Map<Integer, Long> nullValueCounts() {
- return nullValueCounts;
- }
-
- @Override
- public Map<Integer, ByteBuffer> lowerBounds() {
- return lowerBounds;
- }
-
- @Override
- public Map<Integer, ByteBuffer> upperBounds() {
- return upperBounds;
- }
-
- @Override
- public ByteBuffer keyMetadata() {
- return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null;
- }
-
- @Override
- public List<Long> splitOffsets() {
- return splitOffsets;
- }
-
- @Override
- public org.apache.avro.Schema getSchema() {
+ public Schema getSchema() {
if (avroSchema == null) {
this.avroSchema = getAvroSchema(partitionType);
}
@@ -270,7 +189,7 @@ class GenericDataFile
@Override
@SuppressWarnings("unchecked")
- public void put(int i, Object v) {
+ public void put(int i, Object value) {
int pos = i;
// if the schema was projected, map the incoming ordinal to the expected one
if (fromProjectionPos != null) {
@@ -278,45 +197,44 @@ class GenericDataFile
}
switch (pos) {
case 0:
- Preconditions.checkState(v == null || (Integer) v == FileContent.DATA.id(),
- "Invalid content for a DataFile: %s", v);
+ this.content = value != null ? FileContent.values()[(Integer) value] : FileContent.DATA;
return;
case 1:
// always coerce to String for Serializable
- this.filePath = v.toString();
+ this.filePath = value.toString();
return;
case 2:
- this.format = FileFormat.valueOf(v.toString());
+ this.format = FileFormat.valueOf(value.toString());
return;
case 3:
- this.partitionData = (PartitionData) v;
+ this.partitionData = (PartitionData) value;
return;
case 4:
- this.recordCount = (Long) v;
+ this.recordCount = (Long) value;
return;
case 5:
- this.fileSizeInBytes = (Long) v;
+ this.fileSizeInBytes = (Long) value;
return;
case 6:
- this.columnSizes = (Map<Integer, Long>) v;
+ this.columnSizes = (Map<Integer, Long>) value;
return;
case 7:
- this.valueCounts = (Map<Integer, Long>) v;
+ this.valueCounts = (Map<Integer, Long>) value;
return;
case 8:
- this.nullValueCounts = (Map<Integer, Long>) v;
+ this.nullValueCounts = (Map<Integer, Long>) value;
return;
case 9:
- this.lowerBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) v);
+ this.lowerBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
return;
case 10:
- this.upperBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) v);
+ this.upperBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
return;
case 11:
- this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) v);
+ this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value);
return;
case 12:
- this.splitOffsets = (List<Long>) v;
+ this.splitOffsets = (List<Long>) value;
return;
default:
// ignore the object, it must be from a newer version of the format
@@ -337,7 +255,7 @@ class GenericDataFile
}
switch (pos) {
case 0:
- return FileContent.DATA.id();
+ return content.id();
case 1:
return filePath;
case 2:
@@ -359,7 +277,7 @@ class GenericDataFile
case 10:
return upperBounds;
case 11:
- return keyMetadata();
+ return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null;
case 12:
return splitOffsets;
default:
@@ -372,44 +290,61 @@ class GenericDataFile
return javaClass.cast(get(pos));
}
- private static org.apache.avro.Schema getAvroSchema(Types.StructType partitionType) {
- Types.StructType type = DataFile.getType(partitionType);
- return AvroSchemaUtil.convert(type, ImmutableMap.of(
- type, GenericDataFile.class.getName(),
- partitionType, PartitionData.class.getName()));
- }
-
@Override
public int size() {
return DataFile.getType(EMPTY_STRUCT_TYPE).fields().size();
}
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("file_path", filePath)
- .add("file_format", format)
- .add("partition", partitionData)
- .add("record_count", recordCount)
- .add("file_size_in_bytes", fileSizeInBytes)
- .add("column_sizes", columnSizes)
- .add("value_counts", valueCounts)
- .add("null_value_counts", nullValueCounts)
- .add("lower_bounds", lowerBounds)
- .add("upper_bounds", upperBounds)
- .add("key_metadata", keyMetadata == null ? "null" : "(redacted)")
- .add("split_offsets", splitOffsets == null ? "null" : splitOffsets)
- .toString();
+ public FileContent content() {
+ return content;
}
- @Override
- public DataFile copyWithoutStats() {
- return new GenericDataFile(this, false /* drop stats */);
+ public CharSequence path() {
+ return filePath;
}
- @Override
- public DataFile copy() {
- return new GenericDataFile(this, true /* full copy */);
+ public FileFormat format() {
+ return format;
+ }
+
+ public StructLike partition() {
+ return partitionData;
+ }
+
+ public long recordCount() {
+ return recordCount;
+ }
+
+ public long fileSizeInBytes() {
+ return fileSizeInBytes;
+ }
+
+ public Map<Integer, Long> columnSizes() {
+ return columnSizes;
+ }
+
+ public Map<Integer, Long> valueCounts() {
+ return valueCounts;
+ }
+
+ public Map<Integer, Long> nullValueCounts() {
+ return nullValueCounts;
+ }
+
+ public Map<Integer, ByteBuffer> lowerBounds() {
+ return lowerBounds;
+ }
+
+ public Map<Integer, ByteBuffer> upperBounds() {
+ return upperBounds;
+ }
+
+ public ByteBuffer keyMetadata() {
+ return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null;
+ }
+
+ public List<Long> splitOffsets() {
+ return splitOffsets;
}
private static <K, V> Map<K, V> copy(Map<K, V> map) {
@@ -429,4 +364,23 @@ class GenericDataFile
}
return null;
}
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("content", content.toString().toLowerCase(Locale.ROOT))
+ .add("file_path", filePath)
+ .add("file_format", format)
+ .add("partition", partitionData)
+ .add("record_count", recordCount)
+ .add("file_size_in_bytes", fileSizeInBytes)
+ .add("column_sizes", columnSizes)
+ .add("value_counts", valueCounts)
+ .add("null_value_counts", nullValueCounts)
+ .add("lower_bounds", lowerBounds)
+ .add("upper_bounds", upperBounds)
+ .add("key_metadata", keyMetadata == null ? "null" : "(redacted)")
+ .add("split_offsets", splitOffsets == null ? "null" : splitOffsets)
+ .toString();
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseManifestReader.java b/core/src/main/java/org/apache/iceberg/BaseManifestReader.java
index d253555..7217fc0 100644
--- a/core/src/main/java/org/apache/iceberg/BaseManifestReader.java
+++ b/core/src/main/java/org/apache/iceberg/BaseManifestReader.java
@@ -47,17 +47,18 @@ import static org.apache.iceberg.expressions.Expressions.alwaysTrue;
/**
* Base reader for data and delete manifest files.
*
- * @param <T> The Java class of files returned by this reader.
+ * @param <F> The Java class of files returned by this reader.
* @param <ThisT> The Java class of this reader, returned by configuration methods.
*/
-abstract class BaseManifestReader<T, ThisT> extends CloseableGroup implements CloseableIterable<T> {
+abstract class BaseManifestReader<F extends ContentFile<F>, ThisT>
+ extends CloseableGroup implements CloseableIterable<F> {
static final ImmutableList<String> ALL_COLUMNS = ImmutableList.of("*");
private static final Set<String> STATS_COLUMNS = Sets.newHashSet(
"value_counts", "null_value_counts", "lower_bounds", "upper_bounds");
protected enum FileType {
DATA_FILES(GenericDataFile.class.getName()),
- DELETE_FILES("...");
+ DELETE_FILES(GenericDeleteFile.class.getName());
private final String fileClass;
@@ -95,7 +96,7 @@ abstract class BaseManifestReader<T, ThisT> extends CloseableGroup implements Cl
this.content = content;
try {
- try (AvroIterable<ManifestEntry> headerReader = Avro.read(file)
+ try (AvroIterable<ManifestEntry<F>> headerReader = Avro.read(file)
.project(ManifestEntry.getSchema(Types.StructType.of()).select("status"))
.build()) {
this.metadata = headerReader.getMetadata();
@@ -163,7 +164,7 @@ abstract class BaseManifestReader<T, ThisT> extends CloseableGroup implements Cl
return self();
}
- CloseableIterable<ManifestEntry> entries() {
+ CloseableIterable<ManifestEntry<F>> entries() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
(partFilter != null && partFilter != Expressions.alwaysTrue())) {
Evaluator evaluator = evaluator();
@@ -183,13 +184,13 @@ abstract class BaseManifestReader<T, ThisT> extends CloseableGroup implements Cl
}
}
- private CloseableIterable<ManifestEntry> open(Schema projection) {
+ private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
FileFormat format = FileFormat.fromFileName(file.location());
Preconditions.checkArgument(format != null, "Unable to determine format of manifest: %s", file);
switch (format) {
case AVRO:
- AvroIterable<ManifestEntry> reader = Avro.read(file)
+ AvroIterable<ManifestEntry<F>> reader = Avro.read(file)
.project(ManifestEntry.wrapFileSchema(projection.asStruct()))
.rename("manifest_entry", GenericManifestEntry.class.getName())
.rename("partition", PartitionData.class.getName())
@@ -209,7 +210,7 @@ abstract class BaseManifestReader<T, ThisT> extends CloseableGroup implements Cl
}
}
- CloseableIterable<ManifestEntry> liveEntries() {
+ CloseableIterable<ManifestEntry<F>> liveEntries() {
return CloseableIterable.filter(entries(),
entry -> entry != null && entry.status() != ManifestEntry.Status.DELETED);
}
@@ -218,13 +219,11 @@ abstract class BaseManifestReader<T, ThisT> extends CloseableGroup implements Cl
* @return an Iterator of DataFile. Makes defensive copies of files before returning
*/
@Override
- @SuppressWarnings("unchecked")
- public CloseableIterator<T> iterator() {
+ public CloseableIterator<F> iterator() {
if (dropStats(rowFilter, columns)) {
- return (CloseableIterator<T>) CloseableIterable.transform(liveEntries(), e -> e.file().copyWithoutStats())
- .iterator();
+ return CloseableIterable.transform(liveEntries(), e -> e.file().copyWithoutStats()).iterator();
} else {
- return (CloseableIterator<T>) CloseableIterable.transform(liveEntries(), e -> e.file().copy()).iterator();
+ return CloseableIterable.transform(liveEntries(), e -> e.file().copy()).iterator();
}
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
index 5fd02f5..8d0951e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
+++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java
@@ -262,7 +262,7 @@ public abstract class BaseMetastoreCatalog implements Catalog {
.onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc))
.run(manifest -> {
try (ManifestReader reader = ManifestFiles.read(manifest, io)) {
- for (ManifestEntry entry : reader.entries()) {
+ for (ManifestEntry<?> entry : reader.entries()) {
// intern the file path because the weak key map uses identity (==) instead of equals
String path = entry.file().path().toString().intern();
Boolean alreadyDeleted = deletedFiles.putIfAbsent(path, true);
diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
index 405d3fe..24c3c2d 100644
--- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
+++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java
@@ -287,7 +287,7 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
return activeFilesCount;
}
- private void appendEntry(ManifestEntry entry, Object key, int partitionSpecId) {
+ private void appendEntry(ManifestEntry<DataFile> entry, Object key, int partitionSpecId) {
Preconditions.checkNotNull(entry, "Manifest entry cannot be null");
Preconditions.checkNotNull(key, "Key cannot be null");
@@ -323,13 +323,13 @@ public class BaseRewriteManifests extends SnapshotProducer<RewriteManifests> imp
class WriterWrapper {
private final PartitionSpec spec;
- private ManifestWriter writer;
+ private ManifestWriter<DataFile> writer;
WriterWrapper(PartitionSpec spec) {
this.spec = spec;
}
- synchronized void addEntry(ManifestEntry entry) {
+ synchronized void addEntry(ManifestEntry<DataFile> entry) {
if (writer == null) {
writer = newManifestWriter(spec);
} else if (writer.length() >= getManifestTargetSizeBytes()) {
diff --git a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
index 91466c8..4da4b37 100644
--- a/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
+++ b/core/src/main/java/org/apache/iceberg/BaseSnapshot.java
@@ -158,10 +158,10 @@ class BaseSnapshot implements Snapshot {
// read only manifests that were created by this snapshot
Iterable<ManifestFile> changedManifests = Iterables.filter(manifests(),
manifest -> Objects.equal(manifest.snapshotId(), snapshotId));
- try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(io, changedManifests)
+ try (CloseableIterable<ManifestEntry<DataFile>> entries = new ManifestGroup(io, changedManifests)
.ignoreExisting()
.entries()) {
- for (ManifestEntry entry : entries) {
+ for (ManifestEntry<DataFile> entry : entries) {
switch (entry.status()) {
case ADDED:
adds.add(entry.file().copy());
diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java b/core/src/main/java/org/apache/iceberg/DeleteManifestReader.java
similarity index 58%
copy from core/src/main/java/org/apache/iceberg/InheritableMetadata.java
copy to core/src/main/java/org/apache/iceberg/DeleteManifestReader.java
index 5eebe8a..dfeee6b 100644
--- a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteManifestReader.java
@@ -19,8 +19,22 @@
package org.apache.iceberg;
-import java.io.Serializable;
+import java.util.Map;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
-interface InheritableMetadata extends Serializable {
- ManifestEntry apply(ManifestEntry manifestEntry);
+/**
+ * Reader for manifest files.
+ * <p>
+ * Create readers using {@link ManifestFiles#readDeleteManifest(ManifestFile, FileIO, Map)}.
+ */
+public class DeleteManifestReader extends BaseManifestReader<DeleteFile, DeleteManifestReader> {
+ protected DeleteManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById, InheritableMetadata metadata) {
+ super(file, specsById, metadata, FileType.DELETE_FILES);
+ }
+
+ @Override
+ protected DeleteManifestReader self() {
+ return this;
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/FileHistory.java b/core/src/main/java/org/apache/iceberg/FileHistory.java
index 89d8fd3..b6645f7 100644
--- a/core/src/main/java/org/apache/iceberg/FileHistory.java
+++ b/core/src/main/java/org/apache/iceberg/FileHistory.java
@@ -80,7 +80,7 @@ public class FileHistory {
}
@SuppressWarnings("unchecked")
- public Iterable<ManifestEntry> build() {
+ public Iterable<ManifestEntry<?>> build() {
Iterable<Snapshot> snapshots = table.snapshots();
if (startTime != null) {
@@ -100,11 +100,11 @@ public class FileHistory {
// a manifest group will only read each manifest once
ManifestGroup group = new ManifestGroup(((HasTableOperations) table).operations().io(), manifests);
- List<ManifestEntry> results = Lists.newArrayList();
- try (CloseableIterable<ManifestEntry> entries = group.select(HISTORY_COLUMNS).entries()) {
+ List<ManifestEntry<?>> results = Lists.newArrayList();
+ try (CloseableIterable<ManifestEntry<DataFile>> entries = group.select(HISTORY_COLUMNS).entries()) {
// TODO: replace this with an IN predicate
CharSequenceWrapper locationWrapper = CharSequenceWrapper.wrap(null);
- for (ManifestEntry entry : entries) {
+ for (ManifestEntry<?> entry : entries) {
if (entry != null && locations.contains(locationWrapper.set(entry.file().path()))) {
results.add(entry.copy());
}
diff --git a/core/src/main/java/org/apache/iceberg/FindFiles.java b/core/src/main/java/org/apache/iceberg/FindFiles.java
index dfe2021..94e6f8a 100644
--- a/core/src/main/java/org/apache/iceberg/FindFiles.java
+++ b/core/src/main/java/org/apache/iceberg/FindFiles.java
@@ -197,7 +197,7 @@ public class FindFiles {
}
// when snapshot is not null
- CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), snapshot.manifests())
+ CloseableIterable<ManifestEntry<DataFile>> entries = new ManifestGroup(ops.io(), snapshot.manifests())
.specsById(ops.current().specsById())
.filterData(rowFilter)
.filterFiles(fileFilter)
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
index 6c6b1ba..4e86587 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
@@ -19,142 +19,43 @@
package org.apache.iceberg;
-import java.io.Serializable;
+import com.google.common.collect.ImmutableMap;
import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.avro.specific.SpecificData;
+import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
-import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.ByteBuffers;
-
-class GenericDataFile
- implements DataFile, IndexedRecord, StructLike, SpecificData.SchemaConstructable, Serializable {
- private static final Types.StructType EMPTY_STRUCT_TYPE = Types.StructType.of();
- private static final PartitionData EMPTY_PARTITION_DATA = new PartitionData(EMPTY_STRUCT_TYPE) {
- @Override
- public PartitionData copy() {
- return this; // this does not change
- }
- };
-
- private int[] fromProjectionPos;
- private Types.StructType partitionType;
-
- private String filePath = null;
- private FileFormat format = null;
- private PartitionData partitionData = null;
- private Long recordCount = null;
- private long fileSizeInBytes = -1L;
-
- // optional fields
- private Map<Integer, Long> columnSizes = null;
- private Map<Integer, Long> valueCounts = null;
- private Map<Integer, Long> nullValueCounts = null;
- private Map<Integer, ByteBuffer> lowerBounds = null;
- private Map<Integer, ByteBuffer> upperBounds = null;
- private List<Long> splitOffsets = null;
- private byte[] keyMetadata = null;
-
- // cached schema
- private transient org.apache.avro.Schema avroSchema = null;
+class GenericDataFile extends BaseFile<DataFile> implements DataFile {
/**
* Used by Avro reflection to instantiate this class when reading manifest files.
*/
- @SuppressWarnings("checkstyle:RedundantModifier") // Must be public
- public GenericDataFile(org.apache.avro.Schema avroSchema) {
- this.avroSchema = avroSchema;
-
- Types.StructType schema = AvroSchemaUtil.convert(avroSchema).asNestedType().asStructType();
-
- // partition type may be null if the field was not projected
- Type partType = schema.fieldType("partition");
- if (partType != null) {
- this.partitionType = partType.asNestedType().asStructType();
- } else {
- this.partitionType = EMPTY_STRUCT_TYPE;
- }
-
- List<Types.NestedField> fields = schema.fields();
- List<Types.NestedField> allFields = DataFile.getType(partitionType).fields();
- this.fromProjectionPos = new int[fields.size()];
- for (int i = 0; i < fromProjectionPos.length; i += 1) {
- boolean found = false;
- for (int j = 0; j < allFields.size(); j += 1) {
- if (fields.get(i).fieldId() == allFields.get(j).fieldId()) {
- found = true;
- fromProjectionPos[i] = j;
- }
- }
-
- if (!found) {
- throw new IllegalArgumentException("Cannot find projected field: " + fields.get(i));
- }
- }
-
- this.partitionData = new PartitionData(partitionType);
+ GenericDataFile(org.apache.avro.Schema avroSchema) {
+ super(avroSchema);
}
GenericDataFile(String filePath, FileFormat format, long recordCount,
long fileSizeInBytes) {
- this.filePath = filePath;
- this.format = format;
- this.partitionData = EMPTY_PARTITION_DATA;
- this.partitionType = EMPTY_PARTITION_DATA.getPartitionType();
- this.recordCount = recordCount;
- this.fileSizeInBytes = fileSizeInBytes;
+ this(filePath, format, null, recordCount, fileSizeInBytes);
}
GenericDataFile(String filePath, FileFormat format, PartitionData partition,
long recordCount, long fileSizeInBytes) {
- this.filePath = filePath;
- this.format = format;
- this.partitionData = partition;
- this.partitionType = partition.getPartitionType();
- this.recordCount = recordCount;
- this.fileSizeInBytes = fileSizeInBytes;
+ super(FileContent.DATA, filePath, format, partition, fileSizeInBytes, recordCount,
+ null, null, null, null, null, null, null);
}
GenericDataFile(String filePath, FileFormat format, PartitionData partition,
long fileSizeInBytes, Metrics metrics, List<Long> splitOffsets) {
- this.filePath = filePath;
- this.format = format;
-
- // this constructor is used by DataFiles.Builder, which passes null for unpartitioned data
- if (partition == null) {
- this.partitionData = EMPTY_PARTITION_DATA;
- this.partitionType = EMPTY_PARTITION_DATA.getPartitionType();
- } else {
- this.partitionData = partition;
- this.partitionType = partition.getPartitionType();
- }
-
- // this will throw NPE if metrics.recordCount is null
- this.recordCount = metrics.recordCount();
- this.fileSizeInBytes = fileSizeInBytes;
- this.columnSizes = metrics.columnSizes();
- this.valueCounts = metrics.valueCounts();
- this.nullValueCounts = metrics.nullValueCounts();
- this.lowerBounds = SerializableByteBufferMap.wrap(metrics.lowerBounds());
- this.upperBounds = SerializableByteBufferMap.wrap(metrics.upperBounds());
- this.splitOffsets = copy(splitOffsets);
+ this(filePath, format, partition, fileSizeInBytes, metrics, null, splitOffsets);
}
GenericDataFile(String filePath, FileFormat format, PartitionData partition,
long fileSizeInBytes, Metrics metrics,
ByteBuffer keyMetadata, List<Long> splitOffsets) {
- this(filePath, format, partition, fileSizeInBytes, metrics, splitOffsets);
- this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
+ super(FileContent.DATA, filePath, format, partition, fileSizeInBytes, metrics.recordCount(),
+ metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(),
+ metrics.lowerBounds(), metrics.upperBounds(), splitOffsets, keyMetadata);
}
/**
@@ -164,29 +65,7 @@ class GenericDataFile
* @param fullCopy whether to copy all fields or to drop column-level stats
*/
private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) {
- this.filePath = toCopy.filePath;
- this.format = toCopy.format;
- this.partitionData = toCopy.partitionData.copy();
- this.partitionType = toCopy.partitionType;
- this.recordCount = toCopy.recordCount;
- this.fileSizeInBytes = toCopy.fileSizeInBytes;
- if (fullCopy) {
- // TODO: support lazy conversion to/from map
- this.columnSizes = copy(toCopy.columnSizes);
- this.valueCounts = copy(toCopy.valueCounts);
- this.nullValueCounts = copy(toCopy.nullValueCounts);
- this.lowerBounds = SerializableByteBufferMap.wrap(copy(toCopy.lowerBounds));
- this.upperBounds = SerializableByteBufferMap.wrap(copy(toCopy.upperBounds));
- } else {
- this.columnSizes = null;
- this.valueCounts = null;
- this.nullValueCounts = null;
- this.lowerBounds = null;
- this.upperBounds = null;
- }
- this.fromProjectionPos = toCopy.fromProjectionPos;
- this.keyMetadata = toCopy.keyMetadata == null ? null : Arrays.copyOf(toCopy.keyMetadata, toCopy.keyMetadata.length);
- this.splitOffsets = copy(toCopy.splitOffsets);
+ super(toCopy, fullCopy);
}
/**
@@ -196,213 +75,6 @@ class GenericDataFile
}
@Override
- public FileContent content() {
- return FileContent.DATA;
- }
-
- @Override
- public CharSequence path() {
- return filePath;
- }
-
- @Override
- public FileFormat format() {
- return format;
- }
-
- @Override
- public StructLike partition() {
- return partitionData;
- }
-
- @Override
- public long recordCount() {
- return recordCount;
- }
-
- @Override
- public long fileSizeInBytes() {
- return fileSizeInBytes;
- }
-
- @Override
- public Map<Integer, Long> columnSizes() {
- return columnSizes;
- }
-
- @Override
- public Map<Integer, Long> valueCounts() {
- return valueCounts;
- }
-
- @Override
- public Map<Integer, Long> nullValueCounts() {
- return nullValueCounts;
- }
-
- @Override
- public Map<Integer, ByteBuffer> lowerBounds() {
- return lowerBounds;
- }
-
- @Override
- public Map<Integer, ByteBuffer> upperBounds() {
- return upperBounds;
- }
-
- @Override
- public ByteBuffer keyMetadata() {
- return keyMetadata != null ? ByteBuffer.wrap(keyMetadata) : null;
- }
-
- @Override
- public List<Long> splitOffsets() {
- return splitOffsets;
- }
-
- @Override
- public org.apache.avro.Schema getSchema() {
- if (avroSchema == null) {
- this.avroSchema = getAvroSchema(partitionType);
- }
- return avroSchema;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void put(int i, Object v) {
- int pos = i;
- // if the schema was projected, map the incoming ordinal to the expected one
- if (fromProjectionPos != null) {
- pos = fromProjectionPos[i];
- }
- switch (pos) {
- case 0:
- Preconditions.checkState(v == null || (Integer) v == FileContent.DATA.id(),
- "Invalid content for a DataFile: %s", v);
- return;
- case 1:
- // always coerce to String for Serializable
- this.filePath = v.toString();
- return;
- case 2:
- this.format = FileFormat.valueOf(v.toString());
- return;
- case 3:
- this.partitionData = (PartitionData) v;
- return;
- case 4:
- this.recordCount = (Long) v;
- return;
- case 5:
- this.fileSizeInBytes = (Long) v;
- return;
- case 6:
- this.columnSizes = (Map<Integer, Long>) v;
- return;
- case 7:
- this.valueCounts = (Map<Integer, Long>) v;
- return;
- case 8:
- this.nullValueCounts = (Map<Integer, Long>) v;
- return;
- case 9:
- this.lowerBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) v);
- return;
- case 10:
- this.upperBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) v);
- return;
- case 11:
- this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) v);
- return;
- case 12:
- this.splitOffsets = (List<Long>) v;
- return;
- default:
- // ignore the object, it must be from a newer version of the format
- }
- }
-
- @Override
- public <T> void set(int pos, T value) {
- put(pos, value);
- }
-
- @Override
- public Object get(int i) {
- int pos = i;
- // if the schema was projected, map the incoming ordinal to the expected one
- if (fromProjectionPos != null) {
- pos = fromProjectionPos[i];
- }
- switch (pos) {
- case 0:
- return FileContent.DATA.id();
- case 1:
- return filePath;
- case 2:
- return format != null ? format.toString() : null;
- case 3:
- return partitionData;
- case 4:
- return recordCount;
- case 5:
- return fileSizeInBytes;
- case 6:
- return columnSizes;
- case 7:
- return valueCounts;
- case 8:
- return nullValueCounts;
- case 9:
- return lowerBounds;
- case 10:
- return upperBounds;
- case 11:
- return keyMetadata();
- case 12:
- return splitOffsets;
- default:
- throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
- }
- }
-
- @Override
- public <T> T get(int pos, Class<T> javaClass) {
- return javaClass.cast(get(pos));
- }
-
- private static org.apache.avro.Schema getAvroSchema(Types.StructType partitionType) {
- Types.StructType type = DataFile.getType(partitionType);
- return AvroSchemaUtil.convert(type, ImmutableMap.of(
- type, GenericDataFile.class.getName(),
- partitionType, PartitionData.class.getName()));
- }
-
- @Override
- public int size() {
- return DataFile.getType(EMPTY_STRUCT_TYPE).fields().size();
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("file_path", filePath)
- .add("file_format", format)
- .add("partition", partitionData)
- .add("record_count", recordCount)
- .add("file_size_in_bytes", fileSizeInBytes)
- .add("column_sizes", columnSizes)
- .add("value_counts", valueCounts)
- .add("null_value_counts", nullValueCounts)
- .add("lower_bounds", lowerBounds)
- .add("upper_bounds", upperBounds)
- .add("key_metadata", keyMetadata == null ? "null" : "(redacted)")
- .add("split_offsets", splitOffsets == null ? "null" : splitOffsets)
- .toString();
- }
-
- @Override
public DataFile copyWithoutStats() {
return new GenericDataFile(this, false /* drop stats */);
}
@@ -412,21 +84,10 @@ class GenericDataFile
return new GenericDataFile(this, true /* full copy */);
}
- private static <K, V> Map<K, V> copy(Map<K, V> map) {
- if (map != null) {
- Map<K, V> copy = Maps.newHashMapWithExpectedSize(map.size());
- copy.putAll(map);
- return Collections.unmodifiableMap(copy);
- }
- return null;
- }
-
- private static <E> List<E> copy(List<E> list) {
- if (list != null) {
- List<E> copy = Lists.newArrayListWithExpectedSize(list.size());
- copy.addAll(list);
- return Collections.unmodifiableList(copy);
- }
- return null;
+ protected Schema getAvroSchema(Types.StructType partitionStruct) {
+ Types.StructType type = DataFile.getType(partitionStruct);
+ return AvroSchemaUtil.convert(type, ImmutableMap.of(
+ type, GenericDataFile.class.getName(),
+ partitionStruct, PartitionData.class.getName()));
}
}
diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
new file mode 100644
index 0000000..f4a28d0
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+
+import com.google.common.collect.ImmutableMap;
+import java.nio.ByteBuffer;
+import org.apache.avro.Schema;
+import org.apache.iceberg.avro.AvroSchemaUtil;
+import org.apache.iceberg.types.Types;
+
+class GenericDeleteFile extends BaseFile<DeleteFile> implements DeleteFile {
+ /**
+ * Used by Avro reflection to instantiate this class when reading manifest files.
+ */
+ GenericDeleteFile(Schema avroSchema) {
+ super(avroSchema);
+ }
+
+ GenericDeleteFile(FileContent content, String filePath, FileFormat format, PartitionData partition,
+ long fileSizeInBytes, Metrics metrics, ByteBuffer keyMetadata) {
+ super(content, filePath, format, partition, fileSizeInBytes, metrics.recordCount(),
+ metrics.columnSizes(), metrics.valueCounts(), metrics.nullValueCounts(),
+ metrics.lowerBounds(), metrics.upperBounds(), null, keyMetadata);
+ }
+
+ /**
+ * Copy constructor.
+ *
+ * @param toCopy a generic data file to copy.
+ * @param fullCopy whether to copy all fields or to drop column-level stats
+ */
+ private GenericDeleteFile(GenericDeleteFile toCopy, boolean fullCopy) {
+ super(toCopy, fullCopy);
+ }
+
+ /**
+ * Constructor for Java serialization.
+ */
+ GenericDeleteFile() {
+ }
+
+ @Override
+ public DeleteFile copyWithoutStats() {
+ return new GenericDeleteFile(this, false /* drop stats */);
+ }
+
+ @Override
+ public DeleteFile copy() {
+ return new GenericDeleteFile(this, true /* full copy */);
+ }
+
+ protected Schema getAvroSchema(Types.StructType partitionStruct) {
+ Types.StructType type = DataFile.getType(partitionStruct);
+ return AvroSchemaUtil.convert(type, ImmutableMap.of(
+ type, GenericDeleteFile.class.getName(),
+ partitionStruct, PartitionData.class.getName()));
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java
index cc27818..a4bc6f8 100644
--- a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java
+++ b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java
@@ -25,27 +25,24 @@ import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.types.Types;
-class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable, StructLike {
+class GenericManifestEntry<F extends ContentFile<F>>
+ implements ManifestEntry<F>, IndexedRecord, SpecificData.SchemaConstructable, StructLike {
private final org.apache.avro.Schema schema;
- private final V1Metadata.IndexedDataFile fileWrapper;
private Status status = Status.EXISTING;
private Long snapshotId = null;
private Long sequenceNumber = null;
- private DataFile file = null;
+ private F file = null;
GenericManifestEntry(org.apache.avro.Schema schema) {
this.schema = schema;
- this.fileWrapper = null; // do not use the file wrapper to read
}
GenericManifestEntry(Types.StructType partitionType) {
this.schema = AvroSchemaUtil.convert(V1Metadata.entrySchema(partitionType), "manifest_entry");
- this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema());
}
- private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) {
+ private GenericManifestEntry(GenericManifestEntry<F> toCopy, boolean fullCopy) {
this.schema = toCopy.schema;
- this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema());
this.status = toCopy.status;
this.snapshotId = toCopy.snapshotId;
if (fullCopy) {
@@ -55,7 +52,7 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
}
}
- ManifestEntry wrapExisting(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) {
+ ManifestEntry<F> wrapExisting(Long newSnapshotId, Long newSequenceNumber, F newFile) {
this.status = Status.EXISTING;
this.snapshotId = newSnapshotId;
this.sequenceNumber = newSequenceNumber;
@@ -63,7 +60,7 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
return this;
}
- ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) {
+ ManifestEntry<F> wrapAppend(Long newSnapshotId, F newFile) {
this.status = Status.ADDED;
this.snapshotId = newSnapshotId;
this.sequenceNumber = null;
@@ -71,7 +68,7 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
return this;
}
- ManifestEntry wrapDelete(Long newSnapshotId, DataFile newFile) {
+ ManifestEntry<F> wrapDelete(Long newSnapshotId, F newFile) {
this.status = Status.DELETED;
this.snapshotId = newSnapshotId;
this.sequenceNumber = null;
@@ -104,18 +101,18 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
* @return a file
*/
@Override
- public DataFile file() {
+ public F file() {
return file;
}
@Override
- public ManifestEntry copy() {
- return new GenericManifestEntry(this, true /* full copy */);
+ public ManifestEntry<F> copy() {
+ return new GenericManifestEntry<>(this, true /* full copy */);
}
@Override
- public ManifestEntry copyWithoutStats() {
- return new GenericManifestEntry(this, false /* drop stats */);
+ public ManifestEntry<F> copyWithoutStats() {
+ return new GenericManifestEntry<>(this, false /* drop stats */);
}
@Override
@@ -129,6 +126,7 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
}
@Override
+ @SuppressWarnings("unchecked")
public void put(int i, Object v) {
switch (i) {
case 0:
@@ -141,7 +139,7 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
this.sequenceNumber = (Long) v;
return;
case 3:
- this.file = (DataFile) v;
+ this.file = (F) v;
return;
default:
// ignore the object, it must be from a newer version of the format
@@ -163,11 +161,7 @@ class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData
case 2:
return sequenceNumber;
case 3:
- if (fileWrapper == null || file instanceof GenericDataFile) {
- return file;
- } else {
- return fileWrapper.wrap(file);
- }
+ return file;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java b/core/src/main/java/org/apache/iceberg/InheritableMetadata.java
index 5eebe8a..44e0521 100644
--- a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/InheritableMetadata.java
@@ -22,5 +22,5 @@ package org.apache.iceberg;
import java.io.Serializable;
interface InheritableMetadata extends Serializable {
- ManifestEntry apply(ManifestEntry manifestEntry);
+ <F extends ContentFile<F>> ManifestEntry<F> apply(ManifestEntry<F> manifestEntry);
}
diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java
index 3c271bc..d8ceb40 100644
--- a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java
+++ b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java
@@ -51,7 +51,7 @@ class InheritableMetadataFactory {
}
@Override
- public ManifestEntry apply(ManifestEntry manifestEntry) {
+ public <F extends ContentFile<F>> ManifestEntry<F> apply(ManifestEntry<F> manifestEntry) {
if (manifestEntry.snapshotId() == null) {
manifestEntry.setSnapshotId(snapshotId);
}
@@ -70,7 +70,7 @@ class InheritableMetadataFactory {
}
@Override
- public ManifestEntry apply(ManifestEntry manifestEntry) {
+ public <F extends ContentFile<F>> ManifestEntry<F> apply(ManifestEntry<F> manifestEntry) {
manifestEntry.setSnapshotId(snapshotId);
return manifestEntry;
}
@@ -81,7 +81,7 @@ class InheritableMetadataFactory {
private EmptyInheritableMetadata() {}
@Override
- public ManifestEntry apply(ManifestEntry manifestEntry) {
+ public <F extends ContentFile<F>> ManifestEntry<F> apply(ManifestEntry<F> manifestEntry) {
if (manifestEntry.snapshotId() == null) {
throw new IllegalArgumentException("Entries must have explicit snapshot ids if inherited metadata is empty");
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
index 9e4b896..061230f 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntriesTable.java
@@ -129,7 +129,7 @@ public class ManifestEntriesTable extends BaseMetadataTable {
public CloseableIterable<StructLike> rows() {
return CloseableIterable.transform(
ManifestFiles.read(manifest, io).project(fileSchema).entries(),
- file -> (GenericManifestEntry) file);
+ file -> (GenericManifestEntry<?>) file);
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
index bc03850..e07b125 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java
@@ -25,7 +25,7 @@ import org.apache.iceberg.types.Types.StructType;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.apache.iceberg.types.Types.NestedField.required;
-interface ManifestEntry {
+interface ManifestEntry<F extends ContentFile<F>> {
enum Status {
EXISTING(0),
ADDED(1),
@@ -89,9 +89,9 @@ interface ManifestEntry {
/**
* @return a file
*/
- DataFile file();
+ F file();
- ManifestEntry copy();
+ ManifestEntry<F> copy();
- ManifestEntry copyWithoutStats();
+ ManifestEntry<F> copyWithoutStats();
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index ea72e2f..730badf 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -55,6 +55,8 @@ public class ManifestFiles {
* @return a {@link ManifestReader}
*/
public static ManifestReader read(ManifestFile manifest, FileIO io, Map<Integer, PartitionSpec> specsById) {
+ Preconditions.checkArgument(manifest.content() == ManifestContent.DATA,
+ "Cannot read a delete manifest with a ManifestReader: %s", manifest);
InputFile file = io.newInputFile(manifest.path());
InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
return new ManifestReader(file, specsById, inheritableMetadata);
@@ -70,7 +72,7 @@ public class ManifestFiles {
* @param outputFile the destination file location
* @return a manifest writer
*/
- public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
+ public static ManifestWriter<DataFile> write(PartitionSpec spec, OutputFile outputFile) {
return write(1, spec, outputFile, null);
}
@@ -83,7 +85,8 @@ public class ManifestFiles {
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @return a manifest writer
*/
- public static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
+ public static ManifestWriter<DataFile> write(int formatVersion, PartitionSpec spec, OutputFile outputFile,
+ Long snapshotId) {
switch (formatVersion) {
case 1:
return new ManifestWriter.V1Writer(spec, outputFile, snapshotId);
@@ -93,6 +96,43 @@ public class ManifestFiles {
throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion);
}
+ /**
+ * Returns a new {@link DeleteManifestReader} for a {@link ManifestFile}.
+ *
+ * @param manifest a {@link ManifestFile}
+ * @param io a {@link FileIO}
+ * @param specsById a Map from spec ID to partition spec
+ * @return a {@link DeleteManifestReader}
+ */
+ public static DeleteManifestReader readDeleteManifest(ManifestFile manifest, FileIO io,
+ Map<Integer, PartitionSpec> specsById) {
+ Preconditions.checkArgument(manifest.content() == ManifestContent.DELETES,
+ "Cannot read a data manifest with a DeleteManifestReader: %s", manifest);
+ InputFile file = io.newInputFile(manifest.path());
+ InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
+ return new DeleteManifestReader(file, specsById, inheritableMetadata);
+ }
+
+ /**
+ * Create a new {@link ManifestWriter} for the given format version.
+ *
+ * @param formatVersion a target format version
+ * @param spec a {@link PartitionSpec}
+ * @param outputFile an {@link OutputFile} where the manifest will be written
+ * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
+ * @return a manifest writer
+ */
+ public static ManifestWriter<DeleteFile> writeDeleteManifest(int formatVersion, PartitionSpec spec,
+ OutputFile outputFile, Long snapshotId) {
+ switch (formatVersion) {
+ case 1:
+ throw new IllegalArgumentException("Cannot write delete files in a v1 table");
+ case 2:
+ return new ManifestWriter.V2DeleteWriter(spec, outputFile, snapshotId);
+ }
+ throw new UnsupportedOperationException("Cannot write manifest for table version: " + formatVersion);
+ }
+
static ManifestFile copyAppendManifest(int formatVersion,
InputFile toCopy, Map<Integer, PartitionSpec> specsById,
OutputFile outputFile, long snapshotId,
@@ -124,10 +164,10 @@ public class ManifestFiles {
private static ManifestFile copyManifestInternal(int formatVersion, ManifestReader reader, OutputFile outputFile,
long snapshotId, SnapshotSummary.Builder summaryBuilder,
ManifestEntry.Status allowedEntryStatus) {
- ManifestWriter writer = write(formatVersion, reader.spec(), outputFile, snapshotId);
+ ManifestWriter<DataFile> writer = write(formatVersion, reader.spec(), outputFile, snapshotId);
boolean threw = true;
try {
- for (ManifestEntry entry : reader.entries()) {
+ for (ManifestEntry<DataFile> entry : reader.entries()) {
Preconditions.checkArgument(
allowedEntryStatus == entry.status(),
"Invalid manifest entry status: %s (allowed status: %s)",
diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
index b98176e..3aca5f9 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java
@@ -47,7 +47,7 @@ class ManifestGroup {
private final FileIO io;
private final Set<ManifestFile> manifests;
private Predicate<ManifestFile> manifestPredicate;
- private Predicate<ManifestEntry> manifestEntryPredicate;
+ private Predicate<ManifestEntry<DataFile>> manifestEntryPredicate;
private Map<Integer, PartitionSpec> specsById;
private Expression dataFilter;
private Expression fileFilter;
@@ -97,7 +97,7 @@ class ManifestGroup {
return this;
}
- ManifestGroup filterManifestEntries(Predicate<ManifestEntry> newManifestEntryPredicate) {
+ ManifestGroup filterManifestEntries(Predicate<ManifestEntry<DataFile>> newManifestEntryPredicate) {
this.manifestEntryPredicate = manifestEntryPredicate.and(newManifestEntryPredicate);
return this;
}
@@ -169,12 +169,12 @@ class ManifestGroup {
*
* @return a CloseableIterable of manifest entries.
*/
- public CloseableIterable<ManifestEntry> entries() {
+ public CloseableIterable<ManifestEntry<DataFile>> entries() {
return CloseableIterable.concat(entries((manifest, entries) -> entries));
}
private <T> Iterable<CloseableIterable<T>> entries(
- BiFunction<ManifestFile, CloseableIterable<ManifestEntry>, CloseableIterable<T>> entryFn) {
+ BiFunction<ManifestFile, CloseableIterable<ManifestEntry<DataFile>>, CloseableIterable<T>> entryFn) {
LoadingCache<Integer, ManifestEvaluator> evalCache = specsById == null ?
null : Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
@@ -215,7 +215,7 @@ class ManifestGroup {
.caseSensitive(caseSensitive)
.select(columns);
- CloseableIterable<ManifestEntry> entries = reader.entries();
+ CloseableIterable<ManifestEntry<DataFile>> entries = reader.entries();
if (ignoreDeleted) {
entries = reader.liveEntries();
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index ac10f4c..f73390b 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -28,33 +28,19 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
/**
* Writer for manifest files.
+ *
+ * @param <F> Java class of files written to the manifest, either {@link DataFile} or {@link DeleteFile}.
*/
-public abstract class ManifestWriter implements FileAppender<DataFile> {
+public abstract class ManifestWriter<F extends ContentFile<F>> implements FileAppender<F> {
// stand-in for the current sequence number that will be assigned when the commit is successful
// this is replaced when writing a manifest list by the ManifestFile wrapper
static final long UNASSIGNED_SEQ = -1L;
- /**
- * Create a new {@link ManifestWriter}.
- * <p>
- * Manifests created by this writer have all entry snapshot IDs set to null.
- * All entries will inherit the snapshot ID that will be assigned to the manifest on commit.
- *
- * @param spec {@link PartitionSpec} used to produce {@link DataFile} partition tuples
- * @param outputFile the destination file location
- * @return a manifest writer
- * @deprecated will be removed in 0.9.0; use {@link ManifestFiles#write(PartitionSpec, OutputFile)} instead.
- */
- @Deprecated
- public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
- return ManifestFiles.write(spec, outputFile);
- }
-
private final OutputFile file;
private final int specId;
- private final FileAppender<ManifestEntry> writer;
+ private final FileAppender<ManifestEntry<F>> writer;
private final Long snapshotId;
- private final GenericManifestEntry reused;
+ private final GenericManifestEntry<F> reused;
private final PartitionSummary stats;
private boolean closed = false;
@@ -71,15 +57,19 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
this.specId = spec.specId();
this.writer = newAppender(spec, file);
this.snapshotId = snapshotId;
- this.reused = new GenericManifestEntry(spec.partitionType());
+ this.reused = new GenericManifestEntry<>(spec.partitionType());
this.stats = new PartitionSummary(spec);
}
- protected abstract ManifestEntry prepare(ManifestEntry entry);
+ protected abstract ManifestEntry<F> prepare(ManifestEntry<F> entry);
- protected abstract FileAppender<ManifestEntry> newAppender(PartitionSpec spec, OutputFile outputFile);
+ protected abstract FileAppender<ManifestEntry<F>> newAppender(PartitionSpec spec, OutputFile outputFile);
- void addEntry(ManifestEntry entry) {
+ protected ManifestContent content() {
+ return ManifestContent.DATA;
+ }
+
+ void addEntry(ManifestEntry<F> entry) {
switch (entry.status()) {
case ADDED:
addedFiles += 1;
@@ -102,48 +92,48 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
}
/**
- * Add an added entry for a data file.
+ * Add an added entry for a file.
* <p>
* The entry's snapshot ID will be this manifest's snapshot ID.
*
* @param addedFile a data file
*/
@Override
- public void add(DataFile addedFile) {
+ public void add(F addedFile) {
addEntry(reused.wrapAppend(snapshotId, addedFile));
}
- void add(ManifestEntry entry) {
+ void add(ManifestEntry<F> entry) {
addEntry(reused.wrapAppend(snapshotId, entry.file()));
}
/**
- * Add an existing entry for a data file.
+ * Add an existing entry for a file.
*
- * @param existingFile a data file
+ * @param existingFile a file
* @param fileSnapshotId snapshot ID when the data file was added to the table
* @param sequenceNumber sequence number for the data file
*/
- public void existing(DataFile existingFile, long fileSnapshotId, long sequenceNumber) {
+ public void existing(F existingFile, long fileSnapshotId, long sequenceNumber) {
addEntry(reused.wrapExisting(fileSnapshotId, sequenceNumber, existingFile));
}
- void existing(ManifestEntry entry) {
+ void existing(ManifestEntry<F> entry) {
addEntry(reused.wrapExisting(entry.snapshotId(), entry.sequenceNumber(), entry.file()));
}
/**
- * Add a delete entry for a data file.
+ * Add a delete entry for a file.
* <p>
* The entry's snapshot ID will be this manifest's snapshot ID.
*
- * @param deletedFile a data file
+ * @param deletedFile a file
*/
- public void delete(DataFile deletedFile) {
+ public void delete(F deletedFile) {
addEntry(reused.wrapDelete(snapshotId, deletedFile));
}
- void delete(ManifestEntry entry) {
+ void delete(ManifestEntry<F> entry) {
// Use the current Snapshot ID for the delete. It is safe to delete the data file from disk
// when this Snapshot has been removed or when there are no Snapshots older than this one.
addEntry(reused.wrapDelete(snapshotId, entry.file()));
@@ -164,7 +154,7 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
// if the minSequenceNumber is null, then no manifests with a sequence number have been written, so the min
// sequence number is the one that will be assigned when this is committed. pass UNASSIGNED_SEQ to inherit it.
long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ;
- return new GenericManifestFile(file.location(), writer.length(), specId, ManifestContent.DATA,
+ return new GenericManifestFile(file.location(), writer.length(), specId, content(),
UNASSIGNED_SEQ, minSeqNumber, snapshotId,
addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries());
}
@@ -175,21 +165,54 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
writer.close();
}
- static class V2Writer extends ManifestWriter {
- private V2Metadata.IndexedManifestEntry entryWrapper;
+ static class V2Writer extends ManifestWriter<DataFile> {
+ private final V2Metadata.IndexedManifestEntry<DataFile> entryWrapper;
V2Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
- this.entryWrapper = new V2Metadata.IndexedManifestEntry(snapshotId, spec.partitionType());
+ this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType());
+ }
+
+ @Override
+ protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) {
+ return entryWrapper.wrap(entry);
+ }
+
+ @Override
+ protected FileAppender<ManifestEntry<DataFile>> newAppender(PartitionSpec spec, OutputFile file) {
+ Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType());
+ try {
+ return Avro.write(file)
+ .schema(manifestSchema)
+ .named("manifest_entry")
+ .meta("schema", SchemaParser.toJson(spec.schema()))
+ .meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
+ .meta("partition-spec-id", String.valueOf(spec.specId()))
+ .meta("format-version", "2")
+ .meta("content", "data")
+ .overwrite()
+ .build();
+ } catch (IOException e) {
+ throw new RuntimeIOException(e, "Failed to create manifest writer for path: " + file);
+ }
+ }
+ }
+
+ static class V2DeleteWriter extends ManifestWriter<DeleteFile> {
+ private final V2Metadata.IndexedManifestEntry<DeleteFile> entryWrapper;
+
+ V2DeleteWriter(PartitionSpec spec, OutputFile file, Long snapshotId) {
+ super(spec, file, snapshotId);
+ this.entryWrapper = new V2Metadata.IndexedManifestEntry<>(snapshotId, spec.partitionType());
}
@Override
- protected ManifestEntry prepare(ManifestEntry entry) {
+ protected ManifestEntry<DeleteFile> prepare(ManifestEntry<DeleteFile> entry) {
return entryWrapper.wrap(entry);
}
@Override
- protected FileAppender<ManifestEntry> newAppender(PartitionSpec spec, OutputFile file) {
+ protected FileAppender<ManifestEntry<DeleteFile>> newAppender(PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V2Metadata.entrySchema(spec.partitionType());
try {
return Avro.write(file)
@@ -199,16 +222,22 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
.meta("partition-spec", PartitionSpecParser.toJsonFields(spec))
.meta("partition-spec-id", String.valueOf(spec.specId()))
.meta("format-version", "2")
+ .meta("content", "deletes")
.overwrite()
.build();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to create manifest writer for path: " + file);
}
}
+
+ @Override
+ protected ManifestContent content() {
+ return ManifestContent.DELETES;
+ }
}
- static class V1Writer extends ManifestWriter {
- private V1Metadata.IndexedManifestEntry entryWrapper;
+ static class V1Writer extends ManifestWriter<DataFile> {
+ private final V1Metadata.IndexedManifestEntry entryWrapper;
V1Writer(PartitionSpec spec, OutputFile file, Long snapshotId) {
super(spec, file, snapshotId);
@@ -216,12 +245,12 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
}
@Override
- protected ManifestEntry prepare(ManifestEntry entry) {
+ protected ManifestEntry<DataFile> prepare(ManifestEntry<DataFile> entry) {
return entryWrapper.wrap(entry);
}
@Override
- protected FileAppender<ManifestEntry> newAppender(PartitionSpec spec, OutputFile file) {
+ protected FileAppender<ManifestEntry<DataFile>> newAppender(PartitionSpec spec, OutputFile file) {
Schema manifestSchema = V1Metadata.entrySchema(spec.partitionType());
try {
return Avro.write(file)
diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
index 0ba09db..f8fbe90 100644
--- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java
@@ -526,7 +526,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
Evaluator inclusive = extractInclusiveDeleteExpression(reader);
Evaluator strict = extractStrictDeleteExpression(reader);
boolean hasDeletedFiles = false;
- for (ManifestEntry entry : reader.entries()) {
+ for (ManifestEntry<DataFile> entry : reader.entries()) {
DataFile file = entry.file();
boolean fileDelete = deletePaths.contains(pathWrapper.set(file.path())) ||
dropPartitions.contains(partitionWrapper.set(file.partition()));
@@ -555,7 +555,7 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
// manifest. produce a copy of the manifest with all deleted files removed.
List<DataFile> deletedFiles = Lists.newArrayList();
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
- ManifestWriter writer = newManifestWriter(reader.spec());
+ ManifestWriter<DataFile> writer = newManifestWriter(reader.spec());
try {
reader.entries().forEach(entry -> {
DataFile file = entry.file();
@@ -667,11 +667,11 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {
return mergeManifests.get(bin);
}
- ManifestWriter writer = newManifestWriter(ops.current().spec());
+ ManifestWriter<DataFile> writer = newManifestWriter(ops.current().spec());
try {
for (ManifestFile manifest : bin) {
try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
- for (ManifestEntry entry : reader.entries()) {
+ for (ManifestEntry<DataFile> entry : reader.entries()) {
if (entry.status() == Status.DELETED) {
// suppress deletes from previous snapshots. only files deleted by this snapshot
// should be added to the new manifest
diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
index 2464580..f927728 100644
--- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
+++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java
@@ -350,7 +350,7 @@ class RemoveSnapshots implements ExpireSnapshots {
.run(manifest -> {
// the manifest has deletes, scan it to find files to delete
try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
- for (ManifestEntry entry : reader.entries()) {
+ for (ManifestEntry<?> entry : reader.entries()) {
// if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted
if (entry.status() == ManifestEntry.Status.DELETED &&
!validIds.contains(entry.snapshotId())) {
@@ -370,7 +370,7 @@ class RemoveSnapshots implements ExpireSnapshots {
.run(manifest -> {
// the manifest has deletes, scan it to find files to delete
try (ManifestReader reader = ManifestFiles.read(manifest, ops.io(), ops.current().specsById())) {
- for (ManifestEntry entry : reader.entries()) {
+ for (ManifestEntry<?> entry : reader.entries()) {
// delete any ADDED file from manifests that were reverted
if (entry.status() == ManifestEntry.Status.ADDED) {
// use toString to ensure the path will not change (Utf8 is reused)
diff --git a/core/src/main/java/org/apache/iceberg/ScanSummary.java b/core/src/main/java/org/apache/iceberg/ScanSummary.java
index 126ada8..f93f785 100644
--- a/core/src/main/java/org/apache/iceberg/ScanSummary.java
+++ b/core/src/main/java/org/apache/iceberg/ScanSummary.java
@@ -218,7 +218,7 @@ public class ScanSummary {
TopN<String, PartitionMetrics> topN = new TopN<>(
limit, throwIfLimited, Comparators.charSequences());
- try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), manifests)
+ try (CloseableIterable<ManifestEntry<DataFile>> entries = new ManifestGroup(ops.io(), manifests)
.specsById(ops.current().specsById())
.filterData(rowFilter)
.ignoreDeleted()
@@ -226,7 +226,7 @@ public class ScanSummary {
.entries()) {
PartitionSpec spec = table.spec();
- for (ManifestEntry entry : entries) {
+ for (ManifestEntry<?> entry : entries) {
Long timestamp = snapshotTimestamps.get(entry.snapshotId());
// if filtering, skip timestamps that are outside the range
@@ -280,7 +280,7 @@ public class ScanSummary {
return this;
}
- private PartitionMetrics updateFromFile(DataFile file, Long timestampMillis) {
+ private PartitionMetrics updateFromFile(ContentFile<?> file, Long timestampMillis) {
this.fileCount += 1;
this.recordCount += file.recordCount();
this.totalSize += file.fileSizeInBytes();
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 012f7fe..e4d8adc 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -331,7 +331,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement())));
}
- protected ManifestWriter newManifestWriter(PartitionSpec spec) {
+ protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec spec) {
return ManifestFiles.write(ops.current().formatVersion(), spec, newManifestOutput(), snapshotId());
}
@@ -358,7 +358,7 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
Long snapshotId = null;
long maxSnapshotId = Long.MIN_VALUE;
- for (ManifestEntry entry : reader.entries()) {
+ for (ManifestEntry<DataFile> entry : reader.entries()) {
if (entry.snapshotId() > maxSnapshotId) {
maxSnapshotId = entry.snapshotId();
}
diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java
index 3f38cd3..5615ee1 100644
--- a/core/src/main/java/org/apache/iceberg/V1Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java
@@ -220,17 +220,17 @@ class V1Metadata {
/**
* Wrapper used to write a ManifestEntry to v1 metadata.
*/
- static class IndexedManifestEntry implements ManifestEntry, IndexedRecord {
+ static class IndexedManifestEntry implements ManifestEntry<DataFile>, IndexedRecord {
private final org.apache.avro.Schema avroSchema;
private final IndexedDataFile fileWrapper;
- private ManifestEntry wrapped = null;
+ private ManifestEntry<DataFile> wrapped = null;
IndexedManifestEntry(Types.StructType partitionType) {
this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry");
this.fileWrapper = new IndexedDataFile(avroSchema.getField("data_file").schema());
}
- public IndexedManifestEntry wrap(ManifestEntry entry) {
+ public IndexedManifestEntry wrap(ManifestEntry<DataFile> entry) {
this.wrapped = entry;
return this;
}
@@ -294,12 +294,12 @@ class V1Metadata {
}
@Override
- public ManifestEntry copy() {
+ public ManifestEntry<DataFile> copy() {
return wrapped.copy();
}
@Override
- public ManifestEntry copyWithoutStats() {
+ public ManifestEntry<DataFile> copyWithoutStats() {
return wrapped.copyWithoutStats();
}
}
@@ -365,6 +365,11 @@ class V1Metadata {
}
@Override
+ public FileContent content() {
+ return wrapped.content();
+ }
+
+ @Override
public CharSequence path() {
return wrapped.path();
}
diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java
index ee57f24..c3d3d5e 100644
--- a/core/src/main/java/org/apache/iceberg/V2Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java
@@ -257,19 +257,19 @@ class V2Metadata {
);
}
- static class IndexedManifestEntry implements ManifestEntry, IndexedRecord {
+ static class IndexedManifestEntry<F extends ContentFile<F>> implements ManifestEntry<F>, IndexedRecord {
private final org.apache.avro.Schema avroSchema;
private final Long commitSnapshotId;
- private final IndexedDataFile fileWrapper;
- private ManifestEntry wrapped = null;
+ private final IndexedDataFile<?> fileWrapper;
+ private ManifestEntry<F> wrapped = null;
IndexedManifestEntry(Long commitSnapshotId, Types.StructType partitionType) {
this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry");
this.commitSnapshotId = commitSnapshotId;
- this.fileWrapper = new IndexedDataFile(partitionType);
+ this.fileWrapper = new IndexedDataFile<>(partitionType);
}
- public IndexedManifestEntry wrap(ManifestEntry entry) {
+ public IndexedManifestEntry<F> wrap(ManifestEntry<F> entry) {
this.wrapped = entry;
return this;
}
@@ -335,36 +335,37 @@ class V2Metadata {
}
@Override
- public DataFile file() {
+ public F file() {
return wrapped.file();
}
@Override
- public ManifestEntry copy() {
+ public ManifestEntry<F> copy() {
return wrapped.copy();
}
@Override
- public ManifestEntry copyWithoutStats() {
+ public ManifestEntry<F> copyWithoutStats() {
return wrapped.copyWithoutStats();
}
}
/**
- * Wrapper used to write a DataFile to v2 metadata.
+ * Wrapper used to write DataFile or DeleteFile to v2 metadata.
*/
- static class IndexedDataFile implements DataFile, IndexedRecord {
+ static class IndexedDataFile<F> implements ContentFile<F>, IndexedRecord {
private final org.apache.avro.Schema avroSchema;
private final IndexedStructLike partitionWrapper;
- private DataFile wrapped = null;
+ private ContentFile<F> wrapped = null;
IndexedDataFile(Types.StructType partitionType) {
this.avroSchema = AvroSchemaUtil.convert(fileType(partitionType), "data_file");
this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema());
}
- IndexedDataFile wrap(DataFile file) {
- this.wrapped = file;
+ @SuppressWarnings("unchecked")
+ IndexedDataFile<F> wrap(ContentFile<?> file) {
+ this.wrapped = (ContentFile<F>) file;
return this;
}
@@ -377,7 +378,7 @@ class V2Metadata {
public Object get(int pos) {
switch (pos) {
case 0:
- return FileContent.DATA.id();
+ return wrapped.content().id();
case 1:
return wrapped.path().toString();
case 2:
@@ -412,6 +413,11 @@ class V2Metadata {
}
@Override
+ public FileContent content() {
+ return wrapped.content();
+ }
+
+ @Override
public CharSequence path() {
return wrapped.path();
}
@@ -472,13 +478,13 @@ class V2Metadata {
}
@Override
- public DataFile copy() {
- return wrapped.copy();
+ public F copy() {
+ throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper");
}
@Override
- public DataFile copyWithoutStats() {
- return wrapped.copyWithoutStats();
+ public F copyWithoutStats() {
+ throw new UnsupportedOperationException("Cannot copy IndexedDataFile wrapper");
}
}
}
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index e2c7378..8de5e4b 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -147,7 +147,7 @@ public class TableTestBase {
Assert.assertTrue(manifestFile.delete());
OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
- ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId);
+ ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId);
try {
for (DataFile file : files) {
writer.add(file);
@@ -159,22 +159,22 @@ public class TableTestBase {
return writer.toManifestFile();
}
- ManifestFile writeManifest(String fileName, ManifestEntry... entries) throws IOException {
+ ManifestFile writeManifest(String fileName, ManifestEntry<DataFile>... entries) throws IOException {
return writeManifest(null, fileName, entries);
}
- ManifestFile writeManifest(Long snapshotId, ManifestEntry... entries) throws IOException {
+ ManifestFile writeManifest(Long snapshotId, ManifestEntry<DataFile>... entries) throws IOException {
return writeManifest(snapshotId, "input.m0.avro", entries);
}
- ManifestFile writeManifest(Long snapshotId, String fileName, ManifestEntry... entries) throws IOException {
+ ManifestFile writeManifest(Long snapshotId, String fileName, ManifestEntry<DataFile>... entries) throws IOException {
File manifestFile = temp.newFile(fileName);
Assert.assertTrue(manifestFile.delete());
OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
- ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId);
+ ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, snapshotId);
try {
- for (ManifestEntry entry : entries) {
+ for (ManifestEntry<DataFile> entry : entries) {
writer.addEntry(entry);
}
} finally {
@@ -189,7 +189,7 @@ public class TableTestBase {
Assert.assertTrue(manifestFile.delete());
OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
- ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, null);
+ ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, null);
try {
for (DataFile file : files) {
writer.add(file);
@@ -201,8 +201,8 @@ public class TableTestBase {
return writer.toManifestFile();
}
- ManifestEntry manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFile file) {
- GenericManifestEntry entry = new GenericManifestEntry(table.spec().partitionType());
+ ManifestEntry<DataFile> manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFile file) {
+ GenericManifestEntry<DataFile> entry = new GenericManifestEntry<>(table.spec().partitionType());
switch (status) {
case ADDED:
return entry.wrapAppend(snapshotId, file);
@@ -240,7 +240,7 @@ public class TableTestBase {
long id = snap.snapshotId();
Iterator<String> newPaths = paths(newFiles).iterator();
- for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
+ for (ManifestEntry<DataFile> entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
DataFile file = entry.file();
if (sequenceNumber != null) {
V1Assert.assertEquals("Sequence number should default to 0", 0, entry.sequenceNumber().longValue());
@@ -283,7 +283,7 @@ public class TableTestBase {
Iterator<Long> seqs,
Iterator<Long> ids,
Iterator<DataFile> expectedFiles) {
- for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
+ for (ManifestEntry<DataFile> entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
DataFile file = entry.file();
DataFile expected = expectedFiles.next();
if (seqs != null) {
@@ -303,7 +303,7 @@ public class TableTestBase {
Iterator<Long> ids,
Iterator<DataFile> expectedFiles,
Iterator<ManifestEntry.Status> expectedStatuses) {
- for (ManifestEntry entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
+ for (ManifestEntry<DataFile> entry : ManifestFiles.read(manifest, FILE_IO).entries()) {
DataFile file = entry.file();
DataFile expected = expectedFiles.next();
final ManifestEntry.Status expectedStatus = expectedStatuses.next();
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index b670fd4..e899fb7 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -47,7 +47,7 @@ public class TestManifestReader extends TableTestBase {
public void testManifestReaderWithEmptyInheritableMetadata() throws IOException {
ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 1000L, FILE_A));
try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) {
- ManifestEntry entry = Iterables.getOnlyElement(reader.entries());
+ ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader.entries());
Assert.assertEquals(Status.EXISTING, entry.status());
Assert.assertEquals(FILE_A.path(), entry.file().path());
Assert.assertEquals(1000L, (long) entry.snapshotId());
@@ -67,7 +67,7 @@ public class TestManifestReader extends TableTestBase {
public void testManifestReaderWithPartitionMetadata() throws IOException {
ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A));
try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) {
- ManifestEntry entry = Iterables.getOnlyElement(reader.entries());
+ ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader.entries());
Assert.assertEquals(123L, (long) entry.snapshotId());
List<Types.NestedField> fields = ((PartitionData) entry.file().partition()).getPartitionType().fields();
@@ -88,7 +88,7 @@ public class TestManifestReader extends TableTestBase {
ManifestFile manifest = writeManifest(1000L, manifestEntry(Status.EXISTING, 123L, FILE_A));
try (ManifestReader reader = ManifestFiles.read(manifest, FILE_IO)) {
- ManifestEntry entry = Iterables.getOnlyElement(reader.entries());
+ ManifestEntry<DataFile> entry = Iterables.getOnlyElement(reader.entries());
Assert.assertEquals(123L, (long) entry.snapshotId());
List<Types.NestedField> fields = ((PartitionData) entry.file().partition()).getPartitionType().fields();
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
index 6177797..32e2d9a 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
@@ -70,6 +70,9 @@ public class TestManifestWriterVersions {
private static final DataFile DATA_FILE = new GenericDataFile(
PATH, FORMAT, PARTITION, 150972L, METRICS, null, OFFSETS);
+ private static final DeleteFile DELETE_FILE = new GenericDeleteFile(
+ FileContent.EQUALITY_DELETES, PATH, FORMAT, PARTITION, 22905L, METRICS, null);
+
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@@ -77,7 +80,14 @@ public class TestManifestWriterVersions {
public void testV1Write() throws IOException {
ManifestFile manifest = writeManifest(1);
checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
- checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ);
+ checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA);
+ }
+
+ @Test
+ public void testV1WriteDelete() {
+ AssertHelpers.assertThrows("Should fail to write a delete manifest for v1",
+ IllegalArgumentException.class, "Cannot write delete files in a v1 table",
+ () -> writeDeleteManifest(1));
}
@Test
@@ -86,23 +96,43 @@ public class TestManifestWriterVersions {
checkManifest(manifest, 0L);
// v1 should be read using sequence number 0 because it was missing from the manifest list file
- checkEntry(readManifest(manifest), 0L);
+ checkEntry(readManifest(manifest), 0L, FileContent.DATA);
}
@Test
public void testV2Write() throws IOException {
ManifestFile manifest = writeManifest(1);
checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
- checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ);
+ Assert.assertEquals("Content", ManifestContent.DATA, manifest.content());
+ checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA);
}
@Test
public void testV2WriteWithInheritance() throws IOException {
ManifestFile manifest = writeAndReadManifestList(writeManifest(2), 2);
checkManifest(manifest, SEQUENCE_NUMBER);
+ Assert.assertEquals("Content", ManifestContent.DATA, manifest.content());
+
+ // v2 should use the correct sequence number by inheriting it
+ checkEntry(readManifest(manifest), SEQUENCE_NUMBER, FileContent.DATA);
+ }
+
+ @Test
+ public void testV2WriteDelete() throws IOException {
+ ManifestFile manifest = writeDeleteManifest(2);
+ checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
+ Assert.assertEquals("Content", ManifestContent.DELETES, manifest.content());
+ checkEntry(readDeleteManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.EQUALITY_DELETES);
+ }
+
+ @Test
+ public void testV2WriteDeleteWithInheritance() throws IOException {
+ ManifestFile manifest = writeAndReadManifestList(writeDeleteManifest(2), 2);
+ checkManifest(manifest, SEQUENCE_NUMBER);
+ Assert.assertEquals("Content", ManifestContent.DELETES, manifest.content());
// v2 should use the correct sequence number by inheriting it
- checkEntry(readManifest(manifest), SEQUENCE_NUMBER);
+ checkEntry(readDeleteManifest(manifest), SEQUENCE_NUMBER, FileContent.EQUALITY_DELETES);
}
@Test
@@ -117,7 +147,7 @@ public class TestManifestWriterVersions {
checkManifest(manifest2, 0L);
// should not inherit the v2 sequence number because it was a rewrite
- checkEntry(readManifest(manifest2), 0L);
+ checkEntry(readManifest(manifest2), 0L, FileContent.DATA);
}
@Test
@@ -136,24 +166,26 @@ public class TestManifestWriterVersions {
checkRewrittenManifest(manifest2, SEQUENCE_NUMBER, 0L);
// should not inherit the v2 sequence number because it was written into the v2 manifest
- checkRewrittenEntry(readManifest(manifest2), 0L);
+ checkRewrittenEntry(readManifest(manifest2), 0L, FileContent.DATA);
}
- void checkEntry(ManifestEntry entry, Long expectedSequenceNumber) {
+ void checkEntry(ManifestEntry<?> entry, Long expectedSequenceNumber, FileContent content) {
Assert.assertEquals("Status", ManifestEntry.Status.ADDED, entry.status());
Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId());
Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber());
- checkDataFile(entry.file());
+ checkDataFile(entry.file(), content);
}
- void checkRewrittenEntry(ManifestEntry entry, Long expectedSequenceNumber) {
+ void checkRewrittenEntry(ManifestEntry<DataFile> entry, Long expectedSequenceNumber, FileContent content) {
Assert.assertEquals("Status", ManifestEntry.Status.EXISTING, entry.status());
Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId());
Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber());
- checkDataFile(entry.file());
+ checkDataFile(entry.file(), content);
}
- void checkDataFile(DataFile dataFile) {
+ void checkDataFile(ContentFile<?> dataFile, FileContent content) {
+ // DataFile is the superclass of DeleteFile, so this method can check both
+ Assert.assertEquals("Content", content, dataFile.content());
Assert.assertEquals("Path", PATH, dataFile.path());
Assert.assertEquals("Format", FORMAT, dataFile.format());
Assert.assertEquals("Partition", PARTITION, dataFile.partition());
@@ -206,7 +238,7 @@ public class TestManifestWriterVersions {
private ManifestFile rewriteManifest(ManifestFile manifest, int formatVersion) throws IOException {
OutputFile manifestFile = Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString()));
- ManifestWriter writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID);
+ ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID);
try {
writer.existing(readManifest(manifest));
} finally {
@@ -216,21 +248,46 @@ public class TestManifestWriterVersions {
}
private ManifestFile writeManifest(int formatVersion) throws IOException {
+ return writeManifest(DATA_FILE, formatVersion);
+ }
+
+ private ManifestFile writeManifest(DataFile file, int formatVersion) throws IOException {
OutputFile manifestFile = Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString()));
- ManifestWriter writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID);
+ ManifestWriter<DataFile> writer = ManifestFiles.write(formatVersion, SPEC, manifestFile, SNAPSHOT_ID);
try {
- writer.add(DATA_FILE);
+ writer.add(file);
} finally {
writer.close();
}
return writer.toManifestFile();
}
- private ManifestEntry readManifest(ManifestFile manifest) throws IOException {
- try (CloseableIterable<ManifestEntry> reader = ManifestFiles.read(manifest, FILE_IO).entries()) {
- List<ManifestEntry> files = Lists.newArrayList(reader);
+ private ManifestEntry<DataFile> readManifest(ManifestFile manifest) throws IOException {
+ try (CloseableIterable<ManifestEntry<DataFile>> reader = ManifestFiles.read(manifest, FILE_IO).entries()) {
+ List<ManifestEntry<DataFile>> files = Lists.newArrayList(reader);
Assert.assertEquals("Should contain only one data file", 1, files.size());
return files.get(0);
}
}
+
+ private ManifestFile writeDeleteManifest(int formatVersion) throws IOException {
+ OutputFile manifestFile = Files.localOutput(FileFormat.AVRO.addExtension(temp.newFile().toString()));
+ ManifestWriter<DeleteFile> writer = ManifestFiles.writeDeleteManifest(
+ formatVersion, SPEC, manifestFile, SNAPSHOT_ID);
+ try {
+ writer.add(DELETE_FILE);
+ } finally {
+ writer.close();
+ }
+ return writer.toManifestFile();
+ }
+
+ private ManifestEntry<DeleteFile> readDeleteManifest(ManifestFile manifest) throws IOException {
+ try (CloseableIterable<ManifestEntry<DeleteFile>> reader =
+ ManifestFiles.readDeleteManifest(manifest, FILE_IO, null).entries()) {
+ List<ManifestEntry<DeleteFile>> entries = Lists.newArrayList(reader);
+ Assert.assertEquals("Should contain only one data file", 1, entries.size());
+ return entries.get(0);
+ }
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
index 47f0d90..a393adf 100644
--- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
+++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java
@@ -745,7 +745,7 @@ public class TestMergeAppend extends TableTestBase {
initialManifest, pending.manifests().get(1));
// field ids of manifest entries in two manifests with different specs of the same source field should be different
- ManifestEntry entry = ManifestFiles.read(pending.manifests().get(0), FILE_IO).entries().iterator().next();
+ ManifestEntry<DataFile> entry = ManifestFiles.read(pending.manifests().get(0), FILE_IO).entries().iterator().next();
Types.NestedField field = ((PartitionData) entry.file().partition()).getPartitionType().fields().get(0);
Assert.assertEquals(1000, field.fieldId());
Assert.assertEquals("id_bucket", field.name());
diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
index f6d0134..cb64793 100644
--- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
+++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java
@@ -955,7 +955,7 @@ public class TestRewriteManifests extends TableTestBase {
Assert.assertEquals(3, Iterables.size(table.snapshots()));
- ManifestEntry entry = manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A);
+ ManifestEntry<DataFile> entry = manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A);
// update the entry's sequence number or else it will be rejected by the writer
entry.setSequenceNumber(firstSnapshot.sequenceNumber());
ManifestFile newManifest = writeManifest("manifest-file-1.avro", entry);
@@ -1005,7 +1005,7 @@ public class TestRewriteManifests extends TableTestBase {
Assert.assertEquals(1, manifests.size());
ManifestFile manifest = manifests.get(0);
- ManifestEntry appendEntry = manifestEntry(ManifestEntry.Status.ADDED, snapshot.snapshotId(), FILE_A);
+ ManifestEntry<DataFile> appendEntry = manifestEntry(ManifestEntry.Status.ADDED, snapshot.snapshotId(), FILE_A);
// update the entry's sequence number or else it will be rejected by the writer
appendEntry.setSequenceNumber(snapshot.sequenceNumber());
@@ -1018,7 +1018,7 @@ public class TestRewriteManifests extends TableTestBase {
.addManifest(invalidAddedFileManifest)
.commit());
- ManifestEntry deleteEntry = manifestEntry(ManifestEntry.Status.DELETED, snapshot.snapshotId(), FILE_A);
+ ManifestEntry<DataFile> deleteEntry = manifestEntry(ManifestEntry.Status.DELETED, snapshot.snapshotId(), FILE_A);
// update the entry's sequence number or else it will be rejected by the writer
deleteEntry.setSequenceNumber(snapshot.sequenceNumber());