You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by bl...@apache.org on 2020/04/28 01:01:52 UTC

[incubator-iceberg] branch master updated: Use correct manifest versions while rewriting manifests (#980)

This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 02bc389  Use correct manifest versions while rewriting manifests (#980)
02bc389 is described below

commit 02bc389cab351bf176742a8ad42608b1eb895e9f
Author: Anton Okolnychyi <ao...@apple.com>
AuthorDate: Mon Apr 27 18:01:44 2020 -0700

    Use correct manifest versions while rewriting manifests (#980)
---
 .../java/org/apache/iceberg/ManifestFiles.java     |  5 ++--
 .../java/org/apache/iceberg/TableTestBase.java     |  6 ++---
 .../iceberg/actions/RewriteManifestsAction.java    | 30 ++++++++++++++--------
 3 files changed, 25 insertions(+), 16 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/ManifestFiles.java b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
index 2462d07..203d690 100644
--- a/core/src/main/java/org/apache/iceberg/ManifestFiles.java
+++ b/core/src/main/java/org/apache/iceberg/ManifestFiles.java
@@ -71,8 +71,7 @@ public class ManifestFiles {
    * @return a manifest writer
    */
   public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) {
-    // always use a v2 writer to preserve sequence numbers, but use null for sequence number so appends inherit
-    return write(2, spec, outputFile, null);
+    return write(1, spec, outputFile, null);
   }
 
   /**
@@ -84,7 +83,7 @@ public class ManifestFiles {
    * @param snapshotId a snapshot ID for the manifest entries, or null for an inherited ID
    * @return a manifest writer
    */
-  static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
+  public static ManifestWriter write(int formatVersion, PartitionSpec spec, OutputFile outputFile, Long snapshotId) {
     switch (formatVersion) {
       case 1:
         return new ManifestWriter.V1Writer(spec, outputFile, snapshotId);
diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java
index c66e8b1..ef4466d 100644
--- a/core/src/test/java/org/apache/iceberg/TableTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java
@@ -136,7 +136,7 @@ public class TableTestBase {
     Assert.assertTrue(manifestFile.delete());
     OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
 
-    ManifestWriter writer = ManifestFiles.write(table.spec(), outputFile);
+    ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, null);
     try {
       for (DataFile file : files) {
         writer.add(file);
@@ -153,7 +153,7 @@ public class TableTestBase {
     Assert.assertTrue(manifestFile.delete());
     OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
 
-    ManifestWriter writer = ManifestFiles.write(table.spec(), outputFile);
+    ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, null);
     try {
       for (ManifestEntry entry : entries) {
         writer.addEntry(entry);
@@ -170,7 +170,7 @@ public class TableTestBase {
     Assert.assertTrue(manifestFile.delete());
     OutputFile outputFile = table.ops().io().newOutputFile(manifestFile.getCanonicalPath());
 
-    ManifestWriter writer = ManifestFiles.write(table.spec(), outputFile);
+    ManifestWriter writer = ManifestFiles.write(formatVersion, table.spec(), outputFile, null);
     try {
       for (DataFile file : files) {
         writer.add(file);
diff --git a/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java b/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
index b726fde..b8f88ea 100644
--- a/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
+++ b/spark/src/main/java/org/apache/iceberg/actions/RewriteManifestsAction.java
@@ -85,6 +85,7 @@ public class RewriteManifestsAction
   private final JavaSparkContext sparkContext;
   private final Encoder<ManifestFile> manifestEncoder;
   private final Table table;
+  private final int formatVersion;
   private final FileIO fileIO;
   private final long targetManifestSizeBytes;
 
@@ -116,6 +117,9 @@ public class RewriteManifestsAction
     TableOperations ops = ((HasTableOperations) table).operations();
     Path metadataFilePath = new Path(ops.metadataFileLocation("file"));
     this.stagingLocation = metadataFilePath.getParent().toString();
+
+    // use the current table format version for new manifests
+    this.formatVersion = ops.current().formatVersion();
   }
 
   @Override
@@ -228,7 +232,10 @@ public class RewriteManifestsAction
 
     return manifestEntryDF
         .repartition(numManifests)
-        .mapPartitions(toManifests(io, maxNumManifestEntries, stagingLocation, spec, sparkType), manifestEncoder)
+        .mapPartitions(
+            toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+            manifestEncoder
+        )
         .collectAsList();
   }
 
@@ -246,7 +253,10 @@ public class RewriteManifestsAction
       Column partitionColumn = df.col("data_file.partition");
       return df.repartitionByRange(numManifests, partitionColumn)
           .sortWithinPartitions(partitionColumn)
-          .mapPartitions(toManifests(io, maxNumManifestEntries, stagingLocation, spec, sparkType), manifestEncoder)
+          .mapPartitions(
+              toManifests(io, maxNumManifestEntries, stagingLocation, formatVersion, spec, sparkType),
+              manifestEncoder
+          )
           .collectAsList();
     });
   }
@@ -328,16 +338,16 @@ public class RewriteManifestsAction
 
   private static ManifestFile writeManifest(
       List<Row> rows, int startIndex, int endIndex, Broadcast<FileIO> io,
-      String stagingLocation, PartitionSpec spec, StructType sparkType) throws IOException {
+      String location, int format, PartitionSpec spec, StructType sparkType) throws IOException {
 
     String manifestName = "optimized-m-" + UUID.randomUUID();
-    Path manifestPath = new Path(stagingLocation, manifestName);
+    Path manifestPath = new Path(location, manifestName);
     OutputFile outputFile = io.value().newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
 
     Types.StructType dataFileType = DataFile.getType(spec.partitionType());
     SparkDataFile wrapper = new SparkDataFile(dataFileType, sparkType);
 
-    ManifestWriter writer = ManifestFiles.write(spec, outputFile);
+    ManifestWriter writer = ManifestFiles.write(format, spec, outputFile, null);
 
     try {
       for (int index = startIndex; index < endIndex; index++) {
@@ -355,8 +365,8 @@ public class RewriteManifestsAction
   }
 
   private static MapPartitionsFunction<Row, ManifestFile> toManifests(
-      Broadcast<FileIO> io, long maxNumManifestEntries, String stagingLocation,
-      PartitionSpec spec, StructType sparkType) {
+      Broadcast<FileIO> io, long maxNumManifestEntries, String location,
+      int format, PartitionSpec spec, StructType sparkType) {
 
     return (MapPartitionsFunction<Row, ManifestFile>) rows -> {
       List<Row> rowsAsList = Lists.newArrayList(rows);
@@ -367,11 +377,11 @@ public class RewriteManifestsAction
 
       List<ManifestFile> manifests = Lists.newArrayList();
       if (rowsAsList.size() <= maxNumManifestEntries) {
-        manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, stagingLocation, spec, sparkType));
+        manifests.add(writeManifest(rowsAsList, 0, rowsAsList.size(), io, location, format, spec, sparkType));
       } else {
         int midIndex = rowsAsList.size() / 2;
-        manifests.add(writeManifest(rowsAsList, 0, midIndex, io, stagingLocation, spec, sparkType));
-        manifests.add(writeManifest(rowsAsList,  midIndex, rowsAsList.size(), io, stagingLocation, spec, sparkType));
+        manifests.add(writeManifest(rowsAsList, 0, midIndex, io, location, format, spec, sparkType));
+        manifests.add(writeManifest(rowsAsList,  midIndex, rowsAsList.size(), io, location, format, spec, sparkType));
       }
 
       return manifests.iterator();