You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/04/28 01:01:52 UTC
[incubator-iceberg] branch master updated: Use correct manifest
versions while rewriting manifests (#980)
This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 02bc389 Use correct manifest versions while rewriting manifests (#980)
02bc389 is described below
commit 02bc389cab351bf176742a8ad42608b1eb895e9f
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Apr 27 18:01:44 2020 -0700
Use correct manifest versions while rewriting manifests (#980)
---
.../java/org/apache/iceberg/ManifestFiles.java | 5 ++--
.../java/org/apache/iceberg/TableTestBase.java | 6 ++---
.../iceberg/actions/RewriteManifestsAction.java | 30 ++++++++++++++--------
3 files changed, 25 insertions(+), 16 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index 2462d07..203d690 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -71,8 +71,7 @@ public class ManifestFiles {
* @return a manifest writer
*/
public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
- // always use a v2 writer to preserve sequence numbers, but use null for sequence number so appends inherit
- return write(2, spec, outputFile, null);
+ return write(1, spec, outputFile, null);
}
/**
@@ -84,7 +83,7 @@ public class ManifestFiles {
* @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
* @return a manifest writer
*/
- static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
+ public static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
switch (formatVersion) {
case 1:
return new ManifestWriter.V1Writer(spec, outputFile, snapshotId);
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index c66e8b1..ef4466d 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -136,7 +136,7 @@ public class TableTestBase {
Assert.assertTrue(manifestFile.delete());
OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
- ManifestWriter writer = ManifestFiles.write(table.spec(), outputFile);
+ ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, null);
try {
for (DataFile file : files) {
writer.add(file);
@@ -153,7 +153,7 @@ public class TableTestBase {
Assert.assertTrue(manifestFile.delete());
OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
- ManifestWriter writer = ManifestFiles.write(table.spec(), outputFile);
+ ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, null);
try {
for (ManifestEntry entry : entries) {
writer.addEntry(entry);
@@ -170,7 +170,7 @@ public class TableTestBase {
Assert.assertTrue(manifestFile.delete());
OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
- ManifestWriter writer = ManifestFiles.write(table.spec(), outputFile);
+ ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, null);
try {
for (DataFile file : files) {
writer.add(file);
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java b/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
index b726fde..b8f88ea 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
@@ -85,6 +85,7 @@ public class RewriteManifestsAction
private final JavaSparkContext sparkContext;
private final Encoder<ManifestFile> manifestEncoder;
private final Table table;
+ private final int formatVersion;
private final FileIO fileIO;
private final long targetManifestSizeBytes;
@@ -116,6 +117,9 @@ public class RewriteManifestsAction
TableOperations ops = ((HasTableOperations) table).operations();
Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
this.stagingLocation = metadataFilePath.getParent().toString();
+
+ // use the current table format version for new manifests
+ this.formatVersion = ops.current().formatVersion();
}
@Override
@@ -228,7 +232,10 @@ public class RewriteManifestsAction
return manifestEntryDF
.repartition(numManifests)
- .mapPartitions(toManifests(io, maxNumManifestEntries, stagingLocation, spec, sparkType), manifestEncoder)
+ .mapPartitions(
+ toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+ manifestEncoder
+ )
.collectAsList();
}
@@ -246,7 +253,10 @@ public class RewriteManifestsAction
Column partitionColumn = df.col("data_file.partition");
return df.repartitionByRange(numManifests, partitionColumn)
.sortWithinPartitions(partitionColumn)
- .mapPartitions(toManifests(io, maxNumManifestEntries, stagingLocation, spec, sparkType), manifestEncoder)
+ .mapPartitions(
+ toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+ manifestEncoder
+ )
.collectAsList();
});
}
@@ -328,16 +338,16 @@ public class RewriteManifestsAction
private static ManifestFile writeManifest(
List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
- String stagingLocation, PartitionSpec spec, StructType sparkType) throws IOException {
+ String location, int format, PartitionSpec spec, StructType sparkType) throws IOException {
String manifestName = "optimized-m-" + UUID.randomUUID();
- Path manifestPath = new Path(stagingLocation, manifestName);
+ Path manifestPath = new Path(location, manifestName);
OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
Types.StructType dataFileType = DataFile.getType(spec.partitionType());
SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
- ManifestWriter writer = ManifestFiles.write(spec, outputFile);
+ ManifestWriter writer = ManifestFiles.write(format, spec, outputFile, null);
try {
for (int index = startIndex; index < endIndex; index++) {
@@ -355,8 +365,8 @@ public class RewriteManifestsAction
}
private static MapPartitionsFunction<Row, ManifestFile> toManifests(
- Broadcast<FileIO> io, long maxNumManifestEntries, String stagingLocation,
- PartitionSpec spec, StructType sparkType) {
+ Broadcast<FileIO> io, long maxNumManifestEntries, String location,
+ int format, PartitionSpec spec, StructType sparkType) {
return (MapPartitionsFunction<Row, ManifestFile>) rows -> {
List<Row> rowsAsList = Lists.newArrayList(rows);
@@ -367,11 +377,11 @@ public class RewriteManifestsAction
List<ManifestFile> manifests = Lists.newArrayList();
if (rowsAsList.size() <= maxNumManifestEntries) {
- manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, stagingLocation, spec, sparkType));
+ manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
} else {
int midIndex = rowsAsList.size() / 2;
- manifests.add(writeManifest(rowsAsList, 0, midIndex, io, stagingLocation, spec, sparkType));
- manifests.add(writeManifest(rowsAsList, midIndex, rowsAsList.size(), io, stagingLocation, spec, sparkType));
+ manifests.add(writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
+ manifests.add(writeManifest(rowsAsList, midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
}
return manifests.iterator();