You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2020/05/23 19:10:52 UTC
[incubator-iceberg] branch master updated: Add content types to
DataFile and ManifestFile (#1030)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 01d1462 Add content types to DataFile and ManifestFile (#1030)
01d1462 is described below
commit 01d1462756db20a14a9ac67166e5bf56966861b4
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Sat May 23 12:10:42 2020 -0700
Add content types to DataFile and ManifestFile (#1030)
---
api/src/main/java/org/apache/iceberg/DataFile.java | 80 +++++---
.../main/java/org/apache/iceberg/FileContent.java | 39 ++++
.../java/org/apache/iceberg/ManifestContent.java | 38 ++++
.../main/java/org/apache/iceberg/ManifestFile.java | 53 +++--
.../main/java/org/apache/iceberg/types/Types.java | 14 ++
.../test/java/org/apache/iceberg/TestHelpers.java | 10 +-
.../java/org/apache/iceberg/AllEntriesTable.java | 6 +-
.../java/org/apache/iceberg/GenericDataFile.java | 38 ++--
.../org/apache/iceberg/GenericManifestFile.java | 53 +++--
.../org/apache/iceberg/ManifestListWriter.java | 3 +
.../java/org/apache/iceberg/ManifestWriter.java | 3 +-
.../java/org/apache/iceberg/SnapshotProducer.java | 4 +-
.../main/java/org/apache/iceberg/V1Metadata.java | 35 +++-
.../main/java/org/apache/iceberg/V2Metadata.java | 225 +++++++++++++++++----
.../apache/iceberg/TestManifestListVersions.java | 34 +++-
.../spark/source/TestIcebergSourceTablesBase.java | 60 +++---
16 files changed, 535 insertions(+), 160 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java
index 90758a3..d9a4441 100644
--- a/api/src/main/java/org/apache/iceberg/DataFile.java
+++ b/api/src/main/java/org/apache/iceberg/DataFile.java
@@ -22,6 +22,7 @@ package org.apache.iceberg;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
+import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.BinaryType;
import org.apache.iceberg.types.Types.IntegerType;
import org.apache.iceberg.types.Types.ListType;
@@ -37,32 +38,59 @@ import static org.apache.iceberg.types.Types.NestedField.required;
* Interface for files listed in a table manifest.
*/
public interface DataFile {
+ // fields for adding delete data files
+ Types.NestedField CONTENT = optional(134, "content", IntegerType.get(),
+ "Contents of the file: 0=data, 1=position deletes, 2=equality deletes");
+ Types.NestedField FILE_PATH = required(100, "file_path", StringType.get(), "Location URI with FS scheme");
+ Types.NestedField FILE_FORMAT = required(101, "file_format", StringType.get(),
+ "File format name: avro, orc, or parquet");
+ Types.NestedField RECORD_COUNT = required(103, "record_count", LongType.get(), "Number of records in the file");
+ Types.NestedField FILE_SIZE = required(104, "file_size_in_bytes", LongType.get(), "Total file size in bytes");
+ Types.NestedField COLUMN_SIZES = optional(108, "column_sizes", MapType.ofRequired(117, 118,
+ IntegerType.get(), LongType.get()), "Map of column id to total size on disk");
+ Types.NestedField VALUE_COUNTS = optional(109, "value_counts", MapType.ofRequired(119, 120,
+ IntegerType.get(), LongType.get()), "Map of column id to total count, including null and NaN");
+ Types.NestedField NULL_VALUE_COUNTS = optional(110, "null_value_counts", MapType.ofRequired(121, 122,
+ IntegerType.get(), LongType.get()), "Map of column id to null value count");
+ Types.NestedField LOWER_BOUNDS = optional(125, "lower_bounds", MapType.ofRequired(126, 127,
+ IntegerType.get(), BinaryType.get()), "Map of column id to lower bound");
+ Types.NestedField UPPER_BOUNDS = optional(128, "upper_bounds", MapType.ofRequired(129, 130,
+ IntegerType.get(), BinaryType.get()), "Map of column id to upper bound");
+ Types.NestedField KEY_METADATA = optional(131, "key_metadata", BinaryType.get(), "Encryption key metadata blob");
+ Types.NestedField SPLIT_OFFSETS = optional(132, "split_offsets", ListType.ofRequired(133, LongType.get()),
+ "Splittable offsets");
+ int PARTITION_ID = 102;
+ String PARTITION_NAME = "partition";
+ String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";
+ // NEXT ID TO ASSIGN: 135
+
static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
return StructType.of(
- required(100, "file_path", StringType.get()),
- required(101, "file_format", StringType.get()),
- required(102, "partition", partitionType),
- required(103, "record_count", LongType.get()),
- required(104, "file_size_in_bytes", LongType.get()),
- required(105, "block_size_in_bytes", LongType.get()),
- optional(108, "column_sizes", MapType.ofRequired(117, 118,
- IntegerType.get(), LongType.get())),
- optional(109, "value_counts", MapType.ofRequired(119, 120,
- IntegerType.get(), LongType.get())),
- optional(110, "null_value_counts", MapType.ofRequired(121, 122,
- IntegerType.get(), LongType.get())),
- optional(125, "lower_bounds", MapType.ofRequired(126, 127,
- IntegerType.get(), BinaryType.get())),
- optional(128, "upper_bounds", MapType.ofRequired(129, 130,
- IntegerType.get(), BinaryType.get())),
- optional(131, "key_metadata", BinaryType.get()),
- optional(132, "split_offsets", ListType.ofRequired(133, LongType.get()))
- // NEXT ID TO ASSIGN: 134
+ CONTENT,
+ FILE_PATH,
+ FILE_FORMAT,
+ required(PARTITION_ID, PARTITION_NAME, partitionType, PARTITION_DOC),
+ RECORD_COUNT,
+ FILE_SIZE,
+ COLUMN_SIZES,
+ VALUE_COUNTS,
+ NULL_VALUE_COUNTS,
+ LOWER_BOUNDS,
+ UPPER_BOUNDS,
+ KEY_METADATA,
+ SPLIT_OFFSETS
);
}
/**
+ * @return the content stored in the file; one of DATA, POSITION_DELETES, or EQUALITY_DELETES
+ */
+ default FileContent content() {
+ return FileContent.DATA;
+ }
+
+ /**
* @return fully qualified path to the file, suitable for constructing a Hadoop Path
*/
CharSequence path();
@@ -119,6 +147,13 @@ public interface DataFile {
ByteBuffer keyMetadata();
/**
+ * @return List of recommended split locations, if applicable, null otherwise.
+ * When available, this information is used for planning scan tasks whose boundaries
+ * are determined by these offsets. The returned list must be sorted in ascending order.
+ */
+ List<Long> splitOffsets();
+
+ /**
* Copies this {@link DataFile data file}. Manifest readers can reuse data file instances; use
* this method to copy data when collecting files from tasks.
*
@@ -133,11 +168,4 @@ public interface DataFile {
* @return a copy of this data file, without lower bounds, upper bounds, value counts, or null value counts
*/
DataFile copyWithoutStats();
-
- /**
- * @return List of recommended split locations, if applicable, null otherwise.
- * When available, this information is used for planning scan tasks whose boundaries
- * are determined by these offsets. The returned list must be sorted in ascending order.
- */
- List<Long> splitOffsets();
}
diff --git a/api/src/main/java/org/apache/iceberg/FileContent.java b/api/src/main/java/org/apache/iceberg/FileContent.java
new file mode 100644
index 0000000..67bfca7
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/FileContent.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Content type stored in a file, one of DATA, POSITION_DELETES, or EQUALITY_DELETES.
+ */
+public enum FileContent {
+ DATA(0),
+ POSITION_DELETES(1),
+ EQUALITY_DELETES(2);
+
+ private final int id;
+
+ FileContent(int id) {
+ this.id = id;
+ }
+
+ public int id() {
+ return id;
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/ManifestContent.java b/api/src/main/java/org/apache/iceberg/ManifestContent.java
new file mode 100644
index 0000000..b4b4473
--- /dev/null
+++ b/api/src/main/java/org/apache/iceberg/ManifestContent.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg;
+
+/**
+ * Content type stored in a manifest file, either DATA or DELETES.
+ */
+public enum ManifestContent {
+ DATA(0),
+ DELETES(1);
+
+ private final int id;
+
+ ManifestContent(int id) {
+ this.id = id;
+ }
+
+ public int id() {
+ return id;
+ }
+}
diff --git a/api/src/main/java/org/apache/iceberg/ManifestFile.java b/api/src/main/java/org/apache/iceberg/ManifestFile.java
index 7c4739a..713c061 100644
--- a/api/src/main/java/org/apache/iceberg/ManifestFile.java
+++ b/api/src/main/java/org/apache/iceberg/ManifestFile.java
@@ -30,29 +30,41 @@ import static org.apache.iceberg.types.Types.NestedField.required;
* Represents a manifest file that can be scanned to find data files in a table.
*/
public interface ManifestFile {
- Types.NestedField PATH = required(500, "manifest_path", Types.StringType.get());
- Types.NestedField LENGTH = required(501, "manifest_length", Types.LongType.get());
- Types.NestedField SPEC_ID = required(502, "partition_spec_id", Types.IntegerType.get());
- Types.NestedField SNAPSHOT_ID = optional(503, "added_snapshot_id", Types.LongType.get());
- Types.NestedField ADDED_FILES_COUNT = optional(504, "added_data_files_count", Types.IntegerType.get());
- Types.NestedField EXISTING_FILES_COUNT = optional(505, "existing_data_files_count", Types.IntegerType.get());
- Types.NestedField DELETED_FILES_COUNT = optional(506, "deleted_data_files_count", Types.IntegerType.get());
+ Types.NestedField PATH = required(500, "manifest_path", Types.StringType.get(), "Location URI with FS scheme");
+ Types.NestedField LENGTH = required(501, "manifest_length", Types.LongType.get(), "Total file size in bytes");
+ Types.NestedField SPEC_ID = required(502, "partition_spec_id", Types.IntegerType.get(), "Spec ID used to write");
+ Types.NestedField MANIFEST_CONTENT = optional(517, "content", Types.IntegerType.get(),
+ "Contents of the manifest: 0=data, 1=deletes");
+ Types.NestedField SEQUENCE_NUMBER = optional(515, "sequence_number", Types.LongType.get(),
+ "Sequence number when the manifest was added");
+ Types.NestedField MIN_SEQUENCE_NUMBER = optional(516, "min_sequence_number", Types.LongType.get(),
+ "Lowest sequence number in the manifest");
+ Types.NestedField SNAPSHOT_ID = optional(503, "added_snapshot_id", Types.LongType.get(),
+ "Snapshot ID that added the manifest");
+ Types.NestedField ADDED_FILES_COUNT = optional(504, "added_data_files_count", Types.IntegerType.get(),
+ "Added entry count");
+ Types.NestedField EXISTING_FILES_COUNT = optional(505, "existing_data_files_count", Types.IntegerType.get(),
+ "Existing entry count");
+ Types.NestedField DELETED_FILES_COUNT = optional(506, "deleted_data_files_count", Types.IntegerType.get(),
+ "Deleted entry count");
+ Types.NestedField ADDED_ROWS_COUNT = optional(512, "added_rows_count", Types.LongType.get(),
+ "Added rows count");
+ Types.NestedField EXISTING_ROWS_COUNT = optional(513, "existing_rows_count", Types.LongType.get(),
+ "Existing rows count");
+ Types.NestedField DELETED_ROWS_COUNT = optional(514, "deleted_rows_count", Types.LongType.get(),
+ "Deleted rows count");
Types.StructType PARTITION_SUMMARY_TYPE = Types.StructType.of(
- required(509, "contains_null", Types.BooleanType.get()),
- optional(510, "lower_bound", Types.BinaryType.get()), // null if no non-null values
- optional(511, "upper_bound", Types.BinaryType.get())
+ required(509, "contains_null", Types.BooleanType.get(), "True if any file has a null partition value"),
+ optional(510, "lower_bound", Types.BinaryType.get(), "Partition lower bound for all files"),
+ optional(511, "upper_bound", Types.BinaryType.get(), "Partition upper bound for all files")
);
Types.NestedField PARTITION_SUMMARIES = optional(507, "partitions",
- Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE));
- Types.NestedField ADDED_ROWS_COUNT = optional(512, "added_rows_count", Types.LongType.get());
- Types.NestedField EXISTING_ROWS_COUNT = optional(513, "existing_rows_count", Types.LongType.get());
- Types.NestedField DELETED_ROWS_COUNT = optional(514, "deleted_rows_count", Types.LongType.get());
- Types.NestedField SEQUENCE_NUMBER = optional(515, "sequence_number", Types.LongType.get());
- Types.NestedField MIN_SEQUENCE_NUMBER = optional(516, "min_sequence_number", Types.LongType.get());
- // next ID to assign: 517
+ Types.ListType.ofRequired(508, PARTITION_SUMMARY_TYPE),
+ "Summary for each partition");
+ // next ID to assign: 518
Schema SCHEMA = new Schema(
- PATH, LENGTH, SPEC_ID,
+ PATH, LENGTH, SPEC_ID, MANIFEST_CONTENT,
SEQUENCE_NUMBER, MIN_SEQUENCE_NUMBER, SNAPSHOT_ID,
ADDED_FILES_COUNT, EXISTING_FILES_COUNT, DELETED_FILES_COUNT,
ADDED_ROWS_COUNT, EXISTING_ROWS_COUNT, DELETED_ROWS_COUNT,
@@ -78,6 +90,11 @@ public interface ManifestFile {
int partitionSpecId();
/**
+ * @return the content stored in the manifest; either DATA or DELETES
+ */
+ ManifestContent content();
+
+ /**
* @return the sequence number of the commit that added the manifest file
*/
long sequenceNumber();
diff --git a/api/src/main/java/org/apache/iceberg/types/Types.java b/api/src/main/java/org/apache/iceberg/types/Types.java
index 48e7c96..df398d8 100644
--- a/api/src/main/java/org/apache/iceberg/types/Types.java
+++ b/api/src/main/java/org/apache/iceberg/types/Types.java
@@ -457,10 +457,24 @@ public class Types {
return isOptional;
}
+ public NestedField asOptional() {
+ if (isOptional) {
+ return this;
+ }
+ return new NestedField(true, id, name, type, doc);
+ }
+
public boolean isRequired() {
return !isOptional;
}
+ public NestedField asRequired() {
+ if (!isOptional) {
+ return this;
+ }
+ return new NestedField(false, id, name, type, doc);
+ }
+
public int fieldId() {
return id;
}
diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java
index 4989281..b4881b3 100644
--- a/api/src/test/java/org/apache/iceberg/TestHelpers.java
+++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java
@@ -163,6 +163,7 @@ public class TestHelpers {
private final String path;
private final long length;
private final int specId;
+ private final ManifestContent content;
private final Long snapshotId;
private final Integer addedFiles;
private final Long addedRows;
@@ -178,6 +179,7 @@ public class TestHelpers {
this.path = path;
this.length = length;
this.specId = specId;
+ this.content = ManifestContent.DATA;
this.snapshotId = snapshotId;
this.addedFiles = addedFiles;
this.addedRows = null;
@@ -188,13 +190,14 @@ public class TestHelpers {
this.partitions = partitions;
}
- public TestManifestFile(String path, long length, int specId, Long snapshotId,
+ public TestManifestFile(String path, long length, int specId, ManifestContent content, Long snapshotId,
Integer addedFiles, Long addedRows, Integer existingFiles,
Long existingRows, Integer deletedFiles, Long deletedRows,
List<PartitionFieldSummary> partitions) {
this.path = path;
this.length = length;
this.specId = specId;
+ this.content = content;
this.snapshotId = snapshotId;
this.addedFiles = addedFiles;
this.addedRows = addedRows;
@@ -221,6 +224,11 @@ public class TestHelpers {
}
@Override
+ public ManifestContent content() {
+ return content;
+ }
+
+ @Override
public long sequenceNumber() {
return 0;
}
diff --git a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
index 9c6aa85..d48ec76 100644
--- a/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
+++ b/core/src/main/java/org/apache/iceberg/AllEntriesTable.java
@@ -100,11 +100,13 @@ public class AllEntriesTable extends BaseMetadataTable {
protected CloseableIterable<FileScanTask> planFiles(
TableOperations ops, Snapshot snapshot, Expression rowFilter, boolean caseSensitive, boolean colStats) {
CloseableIterable<ManifestFile> manifests = AllDataFilesTable.allManifestFiles(ops.current().snapshots());
+ Schema fileSchema = new Schema(schema().findType("data_file").asStructType().fields());
String schemaString = SchemaParser.toJson(schema());
String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned());
+ ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter);
- return CloseableIterable.transform(manifests, manifest -> new BaseFileScanTask(
- DataFiles.fromManifest(manifest), schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter)));
+ return CloseableIterable.transform(manifests, manifest -> new ManifestEntriesTable.ManifestReadTask(
+ ops.io(), manifest, fileSchema, schemaString, specString, residuals));
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
index aa427cb..35065b1 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
@@ -20,6 +20,7 @@
package org.apache.iceberg;
import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -67,8 +68,6 @@ class GenericDataFile
// cached schema
private transient org.apache.avro.Schema avroSchema = null;
- private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024;
-
/**
* Used by Avro reflection to instantiate this class when reading manifest files.
*/
@@ -197,6 +196,11 @@ class GenericDataFile
}
@Override
+ public FileContent content() {
+ return FileContent.DATA;
+ }
+
+ @Override
public CharSequence path() {
return filePath;
}
@@ -274,22 +278,24 @@ class GenericDataFile
}
switch (pos) {
case 0:
- // always coerce to String for Serializable
- this.filePath = v.toString();
+ Preconditions.checkState(v == null || (Integer) v == FileContent.DATA.id(),
+ "Invalid content for a DataFile: %s", v);
return;
case 1:
- this.format = FileFormat.valueOf(v.toString());
+ // always coerce to String for Serializable
+ this.filePath = v.toString();
return;
case 2:
- this.partitionData = (PartitionData) v;
+ this.format = FileFormat.valueOf(v.toString());
return;
case 3:
- this.recordCount = (Long) v;
+ this.partitionData = (PartitionData) v;
return;
case 4:
- this.fileSizeInBytes = (Long) v;
+ this.recordCount = (Long) v;
return;
case 5:
+ this.fileSizeInBytes = (Long) v;
return;
case 6:
this.columnSizes = (Map<Integer, Long>) v;
@@ -331,19 +337,17 @@ class GenericDataFile
}
switch (pos) {
case 0:
- return filePath;
+ return FileContent.DATA.id();
case 1:
- return format != null ? format.toString() : null;
+ return filePath;
case 2:
- return partitionData;
+ return format != null ? format.toString() : null;
case 3:
- return recordCount;
+ return partitionData;
case 4:
- return fileSizeInBytes;
+ return recordCount;
case 5:
- // block_size_in_bytes is not used. However, it is a required avro field in DataFile. So
- // to maintain compatibility, we need to return something.
- return DEFAULT_BLOCK_SIZE;
+ return fileSizeInBytes;
case 6:
return columnSizes;
case 7:
@@ -377,7 +381,7 @@ class GenericDataFile
@Override
public int size() {
- return 13;
+ return DataFile.getType(EMPTY_STRUCT_TYPE).fields().size();
}
@Override
diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
index 26f7153..66bab61 100644
--- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java
@@ -46,6 +46,7 @@ public class GenericManifestFile
private String manifestPath = null;
private Long length = null;
private int specId = -1;
+ private ManifestContent content = ManifestContent.DATA;
private long sequenceNumber = 0;
private long minSequenceNumber = 0;
private Long snapshotId = null;
@@ -101,7 +102,7 @@ public class GenericManifestFile
this.fromProjectionPos = null;
}
- public GenericManifestFile(String path, long length, int specId,
+ public GenericManifestFile(String path, long length, int specId, ManifestContent content,
long sequenceNumber, long minSequenceNumber, Long snapshotId,
int addedFilesCount, long addedRowsCount, int existingFilesCount,
long existingRowsCount, int deletedFilesCount, long deletedRowsCount,
@@ -110,6 +111,7 @@ public class GenericManifestFile
this.manifestPath = path;
this.length = length;
this.specId = specId;
+ this.content = content;
this.sequenceNumber = sequenceNumber;
this.minSequenceNumber = minSequenceNumber;
this.snapshotId = snapshotId;
@@ -133,6 +135,7 @@ public class GenericManifestFile
this.manifestPath = toCopy.manifestPath;
this.length = toCopy.length;
this.specId = toCopy.specId;
+ this.content = toCopy.content;
this.sequenceNumber = toCopy.sequenceNumber;
this.minSequenceNumber = toCopy.minSequenceNumber;
this.snapshotId = toCopy.snapshotId;
@@ -181,6 +184,11 @@ public class GenericManifestFile
}
@Override
+ public ManifestContent content() {
+ return content;
+ }
+
+ @Override
public long sequenceNumber() {
return sequenceNumber;
}
@@ -255,24 +263,26 @@ public class GenericManifestFile
case 2:
return specId;
case 3:
- return sequenceNumber;
+ return content.id();
case 4:
- return minSequenceNumber;
+ return sequenceNumber;
case 5:
- return snapshotId;
+ return minSequenceNumber;
case 6:
- return addedFilesCount;
+ return snapshotId;
case 7:
- return existingFilesCount;
+ return addedFilesCount;
case 8:
- return deletedFilesCount;
+ return existingFilesCount;
case 9:
- return addedRowsCount;
+ return deletedFilesCount;
case 10:
- return existingRowsCount;
+ return addedRowsCount;
case 11:
- return deletedRowsCount;
+ return existingRowsCount;
case 12:
+ return deletedRowsCount;
+ case 13:
return partitions;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
@@ -299,33 +309,36 @@ public class GenericManifestFile
this.specId = (Integer) value;
return;
case 3:
- this.sequenceNumber = value != null ? (Long) value : 0;
+ this.content = value != null ? ManifestContent.values()[(Integer) value] : ManifestContent.DATA;
return;
case 4:
- this.minSequenceNumber = value != null ? (Long) value : 0;
+ this.sequenceNumber = value != null ? (Long) value : 0;
return;
case 5:
- this.snapshotId = (Long) value;
+ this.minSequenceNumber = value != null ? (Long) value : 0;
return;
case 6:
- this.addedFilesCount = (Integer) value;
+ this.snapshotId = (Long) value;
return;
case 7:
- this.existingFilesCount = (Integer) value;
+ this.addedFilesCount = (Integer) value;
return;
case 8:
- this.deletedFilesCount = (Integer) value;
+ this.existingFilesCount = (Integer) value;
return;
case 9:
- this.addedRowsCount = (Long) value;
+ this.deletedFilesCount = (Integer) value;
return;
case 10:
- this.existingRowsCount = (Long) value;
+ this.addedRowsCount = (Long) value;
return;
case 11:
- this.deletedRowsCount = (Long) value;
+ this.existingRowsCount = (Long) value;
return;
case 12:
+ this.deletedRowsCount = (Long) value;
+ return;
+ case 13:
this.partitions = (List<PartitionFieldSummary>) value;
return;
default:
@@ -393,7 +406,7 @@ public class GenericManifestFile
this.manifestFile = new GenericManifestFile((GenericManifestFile) toCopy);
} else {
this.manifestFile = new GenericManifestFile(
- toCopy.path(), toCopy.length(), toCopy.partitionSpecId(),
+ toCopy.path(), toCopy.length(), toCopy.partitionSpecId(), toCopy.content(),
toCopy.sequenceNumber(), toCopy.minSequenceNumber(), toCopy.snapshotId(),
toCopy.addedFilesCount(), toCopy.addedRowsCount(), toCopy.existingFilesCount(),
toCopy.existingRowsCount(), toCopy.deletedFilesCount(), toCopy.deletedRowsCount(),
diff --git a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
index 1cf9c92..56d127e 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestListWriter.java
@@ -19,6 +19,7 @@
package org.apache.iceberg;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Iterator;
@@ -114,6 +115,8 @@ abstract class ManifestListWriter implements FileAppender<ManifestFile> {
@Override
protected ManifestFile prepare(ManifestFile manifest) {
+ Preconditions.checkArgument(manifest.content() == ManifestContent.DATA,
+ "Cannot store delete manifests in a v1 table");
return wrapper.wrap(manifest);
}
diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
index ddd74c8..22e1011 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java
@@ -164,7 +164,8 @@ public abstract class ManifestWriter implements FileAppender<DataFile> {
// if the minSequenceNumber is null, then no manifests with a sequence number have been written, so the min
// sequence number is the one that will be assigned when this is committed. pass UNASSIGNED_SEQ to inherit it.
long minSeqNumber = minSequenceNumber != null ? minSequenceNumber : UNASSIGNED_SEQ;
- return new GenericManifestFile(file.location(), writer.length(), specId, UNASSIGNED_SEQ, minSeqNumber, snapshotId,
+ return new GenericManifestFile(file.location(), writer.length(), specId, ManifestContent.DATA,
+ UNASSIGNED_SEQ, minSeqNumber, snapshotId,
addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries());
}
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 4acc5f6..17f914b 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -393,8 +393,8 @@ abstract class SnapshotProducer<ThisT> implements SnapshotUpdate<ThisT> {
}
return new GenericManifestFile(manifest.path(), manifest.length(), manifest.partitionSpecId(),
- manifest.sequenceNumber(), manifest.minSequenceNumber(), snapshotId, addedFiles, addedRows, existingFiles,
- existingRows, deletedFiles, deletedRows, stats.summaries());
+ ManifestContent.DATA, manifest.sequenceNumber(), manifest.minSequenceNumber(), snapshotId,
+ addedFiles, addedRows, existingFiles, existingRows, deletedFiles, deletedRows, stats.summaries());
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to read manifest: %s", manifest.path());
diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java
index a905b4d..3f38cd3 100644
--- a/core/src/main/java/org/apache/iceberg/V1Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java
@@ -111,6 +111,11 @@ class V1Metadata {
}
@Override
+ public ManifestContent content() {
+ return wrapped.content();
+ }
+
+ @Override
public long sequenceNumber() {
return wrapped.sequenceNumber();
}
@@ -182,7 +187,7 @@ class V1Metadata {
}
static Schema entrySchema(Types.StructType partitionType) {
- return wrapFileSchema(DataFile.getType(partitionType));
+ return wrapFileSchema(dataFileSchema(partitionType));
}
static Schema wrapFileSchema(Types.StructType fileSchema) {
@@ -192,6 +197,29 @@ class V1Metadata {
required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema));
}
+ private static final Types.NestedField BLOCK_SIZE = required(105, "block_size_in_bytes", Types.LongType.get());
+
+ static Types.StructType dataFileSchema(Types.StructType partitionType) {
+ return Types.StructType.of(
+ DataFile.FILE_PATH,
+ DataFile.FILE_FORMAT,
+ required(DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType),
+ DataFile.RECORD_COUNT,
+ DataFile.FILE_SIZE,
+ BLOCK_SIZE,
+ DataFile.COLUMN_SIZES,
+ DataFile.VALUE_COUNTS,
+ DataFile.NULL_VALUE_COUNTS,
+ DataFile.LOWER_BOUNDS,
+ DataFile.UPPER_BOUNDS,
+ DataFile.KEY_METADATA,
+ DataFile.SPLIT_OFFSETS
+ );
+ }
+
+ /**
+ * Wrapper used to write a ManifestEntry to v1 metadata.
+ */
static class IndexedManifestEntry implements ManifestEntry, IndexedRecord {
private final org.apache.avro.Schema avroSchema;
private final IndexedDataFile fileWrapper;
@@ -226,11 +254,10 @@ class V1Metadata {
return wrapped.snapshotId();
case 2:
DataFile file = wrapped.file();
- if (file == null || file instanceof GenericDataFile) {
- return file;
- } else {
+ if (file != null) {
return fileWrapper.wrap(file);
}
+ return null;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java b/core/src/main/java/org/apache/iceberg/V2Metadata.java
index 6ee9d3f..acd18fc 100644
--- a/core/src/main/java/org/apache/iceberg/V2Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java
@@ -20,7 +20,9 @@
package org.apache.iceberg;
import com.google.common.base.Preconditions;
+import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
import org.apache.avro.generic.IndexedRecord;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.types.Types;
@@ -31,33 +33,22 @@ class V2Metadata {
private V2Metadata() {
}
- // fields for v2 write schema for required metadata
- static final Types.NestedField REQUIRED_SNAPSHOT_ID =
- required(503, "added_snapshot_id", Types.LongType.get());
- static final Types.NestedField REQUIRED_ADDED_FILES_COUNT =
- required(504, "added_data_files_count", Types.IntegerType.get());
- static final Types.NestedField REQUIRED_EXISTING_FILES_COUNT =
- required(505, "existing_data_files_count", Types.IntegerType.get());
- static final Types.NestedField REQUIRED_DELETED_FILES_COUNT =
- required(506, "deleted_data_files_count", Types.IntegerType.get());
- static final Types.NestedField REQUIRED_ADDED_ROWS_COUNT =
- required(512, "added_rows_count", Types.LongType.get());
- static final Types.NestedField REQUIRED_EXISTING_ROWS_COUNT =
- required(513, "existing_rows_count", Types.LongType.get());
- static final Types.NestedField REQUIRED_DELETED_ROWS_COUNT =
- required(514, "deleted_rows_count", Types.LongType.get());
- static final Types.NestedField REQUIRED_SEQUENCE_NUMBER =
- required(515, "sequence_number", Types.LongType.get());
- static final Types.NestedField REQUIRED_MIN_SEQUENCE_NUMBER =
- required(516, "min_sequence_number", Types.LongType.get());
-
static final Schema MANIFEST_LIST_SCHEMA = new Schema(
- ManifestFile.PATH, ManifestFile.LENGTH, ManifestFile.SPEC_ID,
- REQUIRED_SEQUENCE_NUMBER, REQUIRED_MIN_SEQUENCE_NUMBER, REQUIRED_SNAPSHOT_ID,
- REQUIRED_ADDED_FILES_COUNT, REQUIRED_EXISTING_FILES_COUNT, REQUIRED_DELETED_FILES_COUNT,
- REQUIRED_ADDED_ROWS_COUNT, REQUIRED_EXISTING_ROWS_COUNT, REQUIRED_DELETED_ROWS_COUNT,
- ManifestFile.PARTITION_SUMMARIES);
-
+ ManifestFile.PATH,
+ ManifestFile.LENGTH,
+ ManifestFile.SPEC_ID,
+ ManifestFile.MANIFEST_CONTENT.asRequired(),
+ ManifestFile.SEQUENCE_NUMBER.asRequired(),
+ ManifestFile.MIN_SEQUENCE_NUMBER.asRequired(),
+ ManifestFile.SNAPSHOT_ID.asRequired(),
+ ManifestFile.ADDED_FILES_COUNT.asRequired(),
+ ManifestFile.EXISTING_FILES_COUNT.asRequired(),
+ ManifestFile.DELETED_FILES_COUNT.asRequired(),
+ ManifestFile.ADDED_ROWS_COUNT.asRequired(),
+ ManifestFile.EXISTING_ROWS_COUNT.asRequired(),
+ ManifestFile.DELETED_ROWS_COUNT.asRequired(),
+ ManifestFile.PARTITION_SUMMARIES
+ );
/**
* A wrapper class to write any ManifestFile implementation to Avro using the v2 write schema.
@@ -103,6 +94,8 @@ class V2Metadata {
case 2:
return wrapped.partitionSpecId();
case 3:
+ return wrapped.content().id();
+ case 4:
if (wrapped.sequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
// if the sequence number is being assigned here, then the manifest must be created by the current
// operation. to validate this, check that the snapshot id matches the current commit
@@ -112,7 +105,7 @@ class V2Metadata {
} else {
return wrapped.sequenceNumber();
}
- case 4:
+ case 5:
if (wrapped.minSequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
// same sanity check as above
Preconditions.checkState(commitSnapshotId == wrapped.snapshotId(),
@@ -123,21 +116,21 @@ class V2Metadata {
} else {
return wrapped.minSequenceNumber();
}
- case 5:
- return wrapped.snapshotId();
case 6:
- return wrapped.addedFilesCount();
+ return wrapped.snapshotId();
case 7:
- return wrapped.existingFilesCount();
+ return wrapped.addedFilesCount();
case 8:
- return wrapped.deletedFilesCount();
+ return wrapped.existingFilesCount();
case 9:
- return wrapped.addedRowsCount();
+ return wrapped.deletedFilesCount();
case 10:
- return wrapped.existingRowsCount();
+ return wrapped.addedRowsCount();
case 11:
- return wrapped.deletedRowsCount();
+ return wrapped.existingRowsCount();
case 12:
+ return wrapped.deletedRowsCount();
+ case 13:
return wrapped.partitions();
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + pos);
@@ -160,6 +153,11 @@ class V2Metadata {
}
@Override
+ public ManifestContent content() {
+ return wrapped.content();
+ }
+
+ @Override
public long sequenceNumber() {
return wrapped.sequenceNumber();
}
@@ -231,7 +229,7 @@ class V2Metadata {
}
static Schema entrySchema(Types.StructType partitionType) {
- return wrapFileSchema(DataFile.getType(partitionType));
+ return wrapFileSchema(fileType(partitionType));
}
static Schema wrapFileSchema(Types.StructType fileSchema) {
@@ -241,17 +239,34 @@ class V2Metadata {
required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema));
}
+ static Types.StructType fileType(Types.StructType partitionType) {
+ return Types.StructType.of(
+ DataFile.CONTENT.asRequired(),
+ DataFile.FILE_PATH,
+ DataFile.FILE_FORMAT,
+ required(DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType, DataFile.PARTITION_DOC),
+ DataFile.RECORD_COUNT,
+ DataFile.FILE_SIZE,
+ DataFile.COLUMN_SIZES,
+ DataFile.VALUE_COUNTS,
+ DataFile.NULL_VALUE_COUNTS,
+ DataFile.LOWER_BOUNDS,
+ DataFile.UPPER_BOUNDS,
+ DataFile.KEY_METADATA,
+ DataFile.SPLIT_OFFSETS
+ );
+ }
+
static class IndexedManifestEntry implements ManifestEntry, IndexedRecord {
private final org.apache.avro.Schema avroSchema;
private final Long commitSnapshotId;
- private final V1Metadata.IndexedDataFile fileWrapper;
+ private final IndexedDataFile fileWrapper;
private ManifestEntry wrapped = null;
IndexedManifestEntry(Long commitSnapshotId, Types.StructType partitionType) {
this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry");
this.commitSnapshotId = commitSnapshotId;
- // TODO: when v2 data files differ from v1, this should use a v2 wrapper
- this.fileWrapper = new V1Metadata.IndexedDataFile(avroSchema.getField("data_file").schema());
+ this.fileWrapper = new IndexedDataFile(partitionType);
}
public IndexedManifestEntry wrap(ManifestEntry entry) {
@@ -334,4 +349,136 @@ class V2Metadata {
return wrapped.copyWithoutStats();
}
}
+
+ /**
+ * Wrapper used to write a DataFile to v2 metadata.
+ */
+ static class IndexedDataFile implements DataFile, IndexedRecord {
+ private final org.apache.avro.Schema avroSchema;
+ private final IndexedStructLike partitionWrapper;
+ private DataFile wrapped = null;
+
+ IndexedDataFile(Types.StructType partitionType) {
+ this.avroSchema = AvroSchemaUtil.convert(fileType(partitionType), "data_file");
+ this.partitionWrapper = new IndexedStructLike(avroSchema.getField("partition").schema());
+ }
+
+ IndexedDataFile wrap(DataFile file) {
+ this.wrapped = file;
+ return this;
+ }
+
+ @Override
+ public org.apache.avro.Schema getSchema() {
+ return avroSchema;
+ }
+
+ @Override
+ public Object get(int pos) {
+ switch (pos) {
+ case 0:
+ return FileContent.DATA.id();
+ case 1:
+ return wrapped.path().toString();
+ case 2:
+ return wrapped.format() != null ? wrapped.format().toString() : null;
+ case 3:
+ return partitionWrapper.wrap(wrapped.partition());
+ case 4:
+ return wrapped.recordCount();
+ case 5:
+ return wrapped.fileSizeInBytes();
+ case 6:
+ return wrapped.columnSizes();
+ case 7:
+ return wrapped.valueCounts();
+ case 8:
+ return wrapped.nullValueCounts();
+ case 9:
+ return wrapped.lowerBounds();
+ case 10:
+ return wrapped.upperBounds();
+ case 11:
+ return wrapped.keyMetadata();
+ case 12:
+ return wrapped.splitOffsets();
+ }
+ throw new IllegalArgumentException("Unknown field ordinal: " + pos);
+ }
+
+ @Override
+ public void put(int i, Object v) {
+ throw new UnsupportedOperationException("Cannot read into IndexedDataFile");
+ }
+
+ @Override
+ public CharSequence path() {
+ return wrapped.path();
+ }
+
+ @Override
+ public FileFormat format() {
+ return wrapped.format();
+ }
+
+ @Override
+ public StructLike partition() {
+ return wrapped.partition();
+ }
+
+ @Override
+ public long recordCount() {
+ return wrapped.recordCount();
+ }
+
+ @Override
+ public long fileSizeInBytes() {
+ return wrapped.fileSizeInBytes();
+ }
+
+ @Override
+ public Map<Integer, Long> columnSizes() {
+ return wrapped.columnSizes();
+ }
+
+ @Override
+ public Map<Integer, Long> valueCounts() {
+ return wrapped.valueCounts();
+ }
+
+ @Override
+ public Map<Integer, Long> nullValueCounts() {
+ return wrapped.nullValueCounts();
+ }
+
+ @Override
+ public Map<Integer, ByteBuffer> lowerBounds() {
+ return wrapped.lowerBounds();
+ }
+
+ @Override
+ public Map<Integer, ByteBuffer> upperBounds() {
+ return wrapped.upperBounds();
+ }
+
+ @Override
+ public ByteBuffer keyMetadata() {
+ return wrapped.keyMetadata();
+ }
+
+ @Override
+ public List<Long> splitOffsets() {
+ return wrapped.splitOffsets();
+ }
+
+ @Override
+ public DataFile copy() {
+ return wrapped.copy();
+ }
+
+ @Override
+ public DataFile copyWithoutStats() {
+ return wrapped.copyWithoutStats();
+ }
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
index 65ec4b2..7742eb4 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestListVersions.java
@@ -55,7 +55,12 @@ public class TestManifestListVersions {
private static final List<ManifestFile.PartitionFieldSummary> PARTITION_SUMMARIES = ImmutableList.of();
private static final ManifestFile TEST_MANIFEST = new GenericManifestFile(
- PATH, LENGTH, SPEC_ID, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID,
+ PATH, LENGTH, SPEC_ID, ManifestContent.DATA, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID,
+ ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS,
+ PARTITION_SUMMARIES);
+
+ private static final ManifestFile TEST_DELETE_MANIFEST = new GenericManifestFile(
+ PATH, LENGTH, SPEC_ID, ManifestContent.DELETES, SEQ_NUM, MIN_SEQ_NUM, SNAPSHOT_ID,
ADDED_FILES, ADDED_ROWS, EXISTING_FILES, EXISTING_ROWS, DELETED_FILES, DELETED_ROWS,
PARTITION_SUMMARIES);
@@ -63,6 +68,13 @@ public class TestManifestListVersions {
public TemporaryFolder temp = new TemporaryFolder();
@Test
+ public void testV1WriteDeleteManifest() {
+ AssertHelpers.assertThrows("Should fail to write a DELETE manifest to v1",
+ IllegalArgumentException.class, "Cannot store delete manifests in a v1 table",
+ () -> writeManifestList(TEST_DELETE_MANIFEST, 1));
+ }
+
+ @Test
public void testV1Write() throws IOException {
ManifestFile manifest = writeAndReadManifestList(1);
@@ -74,6 +86,7 @@ public class TestManifestListVersions {
Assert.assertEquals("Path", PATH, manifest.path());
Assert.assertEquals("Length", LENGTH, manifest.length());
Assert.assertEquals("Spec id", SPEC_ID, manifest.partitionSpecId());
+ Assert.assertEquals("Content", ManifestContent.DATA, manifest.content());
Assert.assertEquals("Snapshot id", SNAPSHOT_ID, (long) manifest.snapshotId());
Assert.assertEquals("Added files count", ADDED_FILES, (int) manifest.addedFilesCount());
Assert.assertEquals("Existing files count", EXISTING_FILES, (int) manifest.existingFilesCount());
@@ -91,6 +104,7 @@ public class TestManifestListVersions {
Assert.assertEquals("Path", PATH, manifest.path());
Assert.assertEquals("Length", LENGTH, manifest.length());
Assert.assertEquals("Spec id", SPEC_ID, manifest.partitionSpecId());
+ Assert.assertEquals("Content", ManifestContent.DATA, manifest.content());
Assert.assertEquals("Sequence number", SEQ_NUM, manifest.sequenceNumber());
Assert.assertEquals("Min sequence number", MIN_SEQ_NUM, manifest.minSequenceNumber());
Assert.assertEquals("Snapshot id", SNAPSHOT_ID, (long) manifest.snapshotId());
@@ -104,7 +118,7 @@ public class TestManifestListVersions {
@Test
public void testV1ForwardCompatibility() throws IOException {
- InputFile manifestList = writeManifestList(1);
+ InputFile manifestList = writeManifestList(TEST_MANIFEST, 1);
GenericData.Record generic = readGeneric(manifestList, V1Metadata.MANIFEST_LIST_SCHEMA);
// v1 metadata should match even though order changed
@@ -118,12 +132,15 @@ public class TestManifestListVersions {
Assert.assertEquals("Added rows count", ADDED_ROWS, (long) generic.get("added_rows_count"));
Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) generic.get("existing_rows_count"));
Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) generic.get("deleted_rows_count"));
+ Assert.assertNull("Content", generic.get(ManifestFile.MANIFEST_CONTENT.name()));
+ Assert.assertNull("Sequence number", generic.get(ManifestFile.SEQUENCE_NUMBER.name()));
+ Assert.assertNull("Min sequence number", generic.get(ManifestFile.MIN_SEQUENCE_NUMBER.name()));
}
@Test
public void testV2ForwardCompatibility() throws IOException {
- // v2 manifest list files can be read by v1 readers, but the sequence numbers will be ignored.
- InputFile manifestList = writeManifestList(2);
+ // v2 manifest list files can be read by v1 readers, but the sequence numbers and content will be ignored.
+ InputFile manifestList = writeManifestList(TEST_MANIFEST, 2);
GenericData.Record generic = readGeneric(manifestList, V1Metadata.MANIFEST_LIST_SCHEMA);
// v1 metadata should match even though order changed
@@ -137,6 +154,9 @@ public class TestManifestListVersions {
Assert.assertEquals("Added rows count", ADDED_ROWS, (long) generic.get("added_rows_count"));
Assert.assertEquals("Existing rows count", EXISTING_ROWS, (long) generic.get("existing_rows_count"));
Assert.assertEquals("Deleted rows count", DELETED_ROWS, (long) generic.get("deleted_rows_count"));
+ Assert.assertNull("Content", generic.get(ManifestFile.MANIFEST_CONTENT.name()));
+ Assert.assertNull("Sequence number", generic.get(ManifestFile.SEQUENCE_NUMBER.name()));
+ Assert.assertNull("Min sequence number", generic.get(ManifestFile.MIN_SEQUENCE_NUMBER.name()));
}
@Test
@@ -187,11 +207,11 @@ public class TestManifestListVersions {
Assert.assertNull("Deleted rows count should be null", manifest.deletedRowsCount());
}
- private InputFile writeManifestList(int formatVersion) throws IOException {
+ private InputFile writeManifestList(ManifestFile manifest, int formatVersion) throws IOException {
OutputFile manifestList = Files.localOutput(temp.newFile());
try (FileAppender<ManifestFile> writer = ManifestLists.write(
formatVersion, manifestList, SNAPSHOT_ID, SNAPSHOT_ID - 1, formatVersion > 1 ? SEQ_NUM : 0)) {
- writer.add(TEST_MANIFEST);
+ writer.add(manifest);
}
return manifestList.toInputFile();
}
@@ -208,7 +228,7 @@ public class TestManifestListVersions {
}
private ManifestFile writeAndReadManifestList(int formatVersion) throws IOException {
- List<ManifestFile> manifests = ManifestLists.read(writeManifestList(formatVersion));
+ List<ManifestFile> manifests = ManifestLists.read(writeManifestList(TEST_MANIFEST, formatVersion));
Assert.assertEquals("Should contain one manifest", 1, manifests.size());
return manifests.get(0);
}
diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
index e993256..a020611 100644
--- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
+++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceTablesBase.java
@@ -27,6 +27,7 @@ import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileContent;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -125,8 +126,9 @@ public abstract class TestIcebergSourceTablesBase {
try (CloseableIterable<GenericData.Record> rows = Avro.read(manifest).project(entriesTable.schema()).build()) {
// each row must inherit snapshot_id and sequence_number
rows.forEach(row -> {
- row.put(1, snapshot.snapshotId());
row.put(2, 0L);
+ GenericData.Record file = (GenericData.Record) row.get("data_file");
+ file.put(0, FileContent.DATA.id());
expected.add(row);
});
}
@@ -172,9 +174,13 @@ public abstract class TestIcebergSourceTablesBase {
for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::manifests))) {
InputFile in = table.io().newInputFile(manifest.path());
try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
- for (GenericData.Record record : rows) {
- expected.add(record);
- }
+ // each row must inherit snapshot_id and sequence_number
+ rows.forEach(row -> {
+ row.put(2, 0L);
+ GenericData.Record file = (GenericData.Record) row.get("data_file");
+ file.put(0, FileContent.DATA.id());
+ expected.add(row);
+ });
}
}
@@ -222,7 +228,9 @@ public abstract class TestIcebergSourceTablesBase {
try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
- expected.add((GenericData.Record) record.get("data_file"));
+ GenericData.Record file = (GenericData.Record) record.get("data_file");
+ file.put(0, FileContent.DATA.id());
+ expected.add(file);
}
}
}
@@ -248,14 +256,14 @@ public abstract class TestIcebergSourceTablesBase {
new SimpleRecord(2, "b")
);
- try {
- Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
- inputDF.select("id", "data").write()
- .format("parquet")
- .mode("append")
- .partitionBy("id")
- .saveAsTable("parquet_table");
+ Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
+ inputDF.select("id", "data").write()
+ .format("parquet")
+ .mode("overwrite")
+ .partitionBy("id")
+ .saveAsTable("parquet_table");
+ try {
String stagingLocation = table.location() + "/metadata";
SparkTableUtil.importSparkTable(spark,
new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"),
@@ -271,7 +279,9 @@ public abstract class TestIcebergSourceTablesBase {
InputFile in = table.io().newInputFile(manifest.path());
try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
- expected.add((GenericData.Record) record.get("data_file"));
+ GenericData.Record file = (GenericData.Record) record.get("data_file");
+ file.put(0, FileContent.DATA.id());
+ expected.add(file);
}
}
}
@@ -301,14 +311,14 @@ public abstract class TestIcebergSourceTablesBase {
new SimpleRecord(2, "b")
);
- try {
- Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
- inputDF.select("id", "data").write()
- .format("parquet")
- .mode("append")
- .partitionBy("id")
- .saveAsTable("parquet_table");
+ Dataset<Row> inputDF = spark.createDataFrame(records, SimpleRecord.class);
+ inputDF.select("id", "data").write()
+ .format("parquet")
+ .mode("overwrite")
+ .partitionBy("id")
+ .saveAsTable("parquet_table");
+ try {
String stagingLocation = table.location() + "/metadata";
SparkTableUtil.importSparkTable(
spark, new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), table, stagingLocation);
@@ -371,7 +381,9 @@ public abstract class TestIcebergSourceTablesBase {
try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
- expected.add((GenericData.Record) record.get("data_file"));
+ GenericData.Record file = (GenericData.Record) record.get("data_file");
+ file.put(0, FileContent.DATA.id());
+ expected.add(file);
}
}
}
@@ -458,7 +470,7 @@ public abstract class TestIcebergSourceTablesBase {
.load(loadLocation(tableIdentifier, "all_data_files"))
.orderBy("file_path")
.collectAsList();
- actual.sort(Comparator.comparing(o -> o.getString(0)));
+ actual.sort(Comparator.comparing(o -> o.getString(1)));
List<GenericData.Record> expected = Lists.newArrayList();
for (ManifestFile manifest : Iterables.concat(Iterables.transform(table.snapshots(), Snapshot::manifests))) {
@@ -466,7 +478,9 @@ public abstract class TestIcebergSourceTablesBase {
try (CloseableIterable<GenericData.Record> rows = Avro.read(in).project(entriesTable.schema()).build()) {
for (GenericData.Record record : rows) {
if ((Integer) record.get("status") < 2 /* added or existing */) {
- expected.add((GenericData.Record) record.get("data_file"));
+ GenericData.Record file = (GenericData.Record) record.get("data_file");
+ file.put(0, FileContent.DATA.id());
+ expected.add(file);
}
}
}