You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/12/01 17:35:50 UTC

[iceberg] branch master updated: Spark 2.4: Preserve file seq numbers while rewriting manifests (#6289)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new bf8e68b4a7 Spark 2.4: Preserve file seq numbers while rewriting manifests (#6289)
bf8e68b4a7 is described below

commit bf8e68b4a7b4cca187ab0b152da9d5e98d4a45d8
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Thu Dec 1 18:35:44 2022 +0100

    Spark 2.4: Preserve file seq numbers while rewriting manifests (#6289)
---
 .../actions/BaseRewriteManifestsSparkAction.java   | 13 ++--
 .../java/org/apache/iceberg/ValidationHelpers.java | 77 ++++++++++++++++++++++
 .../spark/actions/TestRewriteManifestsAction.java  | 68 +++++++++++++++++++
 3 files changed, 154 insertions(+), 4 deletions(-)

diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index c1b9537c5a..4cca7caf6e 100644
--- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -197,12 +197,16 @@ public class BaseRewriteManifestsSparkAction
         loadMetadataTable(table, ENTRIES)
             .filter("status < 2") // select only live entries
             .selectExpr(
-                "input_file_name() as manifest", "snapshot_id", "sequence_number", "data_file");
+                "input_file_name() as manifest",
+                "snapshot_id",
+                "sequence_number",
+                "file_sequence_number",
+                "data_file");
 
     Column joinCond = manifestDF.col("manifest").equalTo(manifestEntryDF.col("manifest"));
     return manifestEntryDF
         .join(manifestDF, joinCond, "left_semi")
-        .select("snapshot_id", "sequence_number", "data_file");
+        .select("snapshot_id", "sequence_number", "file_sequence_number", "data_file");
   }
 
   private List<ManifestFile> writeManifestsForUnpartitionedTable(
@@ -355,8 +359,9 @@ public class BaseRewriteManifestsSparkAction
         Row row = rows.get(index);
         long snapshotId = row.getLong(0);
         long sequenceNumber = row.getLong(1);
-        Row file = row.getStruct(2);
-        writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber);
+        Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2);
+        Row file = row.getStruct(3);
+        writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber, fileSequenceNumber);
       }
     } finally {
       writer.close();
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/ValidationHelpers.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/ValidationHelpers.java
new file mode 100644
index 0000000000..70ab04f0a0
--- /dev/null
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/ValidationHelpers.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+
+public class ValidationHelpers {
+
+  private ValidationHelpers() {}
+
+  public static List<Long> dataSeqs(Long... seqs) {
+    return Arrays.asList(seqs);
+  }
+
+  public static List<Long> fileSeqs(Long... seqs) {
+    return Arrays.asList(seqs);
+  }
+
+  public static List<Long> snapshotIds(Long... ids) {
+    return Arrays.asList(ids);
+  }
+
+  public static List<String> files(ContentFile<?>... files) {
+    return Arrays.stream(files).map(file -> file.path().toString()).collect(Collectors.toList());
+  }
+
+  public static void validateDataManifest(
+      Table table,
+      ManifestFile manifest,
+      List<Long> dataSeqs,
+      List<Long> fileSeqs,
+      List<Long> snapshotIds,
+      List<String> files) {
+
+    List<Long> actualDataSeqs = Lists.newArrayList();
+    List<Long> actualFileSeqs = Lists.newArrayList();
+    List<Long> actualSnapshotIds = Lists.newArrayList();
+    List<String> actualFiles = Lists.newArrayList();
+
+    for (ManifestEntry<DataFile> entry : ManifestFiles.read(manifest, table.io()).entries()) {
+      actualDataSeqs.add(entry.dataSequenceNumber());
+      actualFileSeqs.add(entry.fileSequenceNumber());
+      actualSnapshotIds.add(entry.snapshotId());
+      actualFiles.add(entry.file().path().toString());
+    }
+
+    assertSameElements("data seqs", actualDataSeqs, dataSeqs);
+    assertSameElements("file seqs", actualFileSeqs, fileSeqs);
+    assertSameElements("snapshot IDs", actualSnapshotIds, snapshotIds);
+    assertSameElements("files", actualFiles, files);
+  }
+
+  private static <T> void assertSameElements(String context, List<T> actual, List<T> expected) {
+    String errorMessage = String.format("%s must match", context);
+    Assertions.assertThat(actual).as(errorMessage).hasSameElementsAs(expected);
+  }
+}
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 4b50ea0c29..95f2f12d5f 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -18,6 +18,11 @@
  */
 package org.apache.iceberg.spark.actions;
 
+import static org.apache.iceberg.ValidationHelpers.dataSeqs;
+import static org.apache.iceberg.ValidationHelpers.fileSeqs;
+import static org.apache.iceberg.ValidationHelpers.files;
+import static org.apache.iceberg.ValidationHelpers.snapshotIds;
+import static org.apache.iceberg.ValidationHelpers.validateDataManifest;
 import static org.apache.iceberg.types.Types.NestedField.optional;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.spy;
@@ -29,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -38,6 +44,7 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -471,6 +478,67 @@ public class TestRewriteManifestsAction extends SparkTestBase {
     Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
+  @Test
+  public void testRewriteSmallManifestsNonPartitionedV2Table() {
+    PartitionSpec spec = PartitionSpec.unpartitioned();
+    Map<String, String> properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+    Table table = TABLES.create(SCHEMA, spec, properties, tableLocation);
+
+    List<ThreeColumnRecord> records1 = Lists.newArrayList(new ThreeColumnRecord(1, null, "AAAA"));
+    writeRecords(records1);
+
+    table.refresh();
+
+    Snapshot snapshot1 = table.currentSnapshot();
+    DataFile file1 = Iterables.getOnlyElement(snapshot1.addedDataFiles(table.io()));
+
+    List<ThreeColumnRecord> records2 = Lists.newArrayList(new ThreeColumnRecord(2, "CCCC", "CCCC"));
+    writeRecords(records2);
+
+    table.refresh();
+
+    Snapshot snapshot2 = table.currentSnapshot();
+    DataFile file2 = Iterables.getOnlyElement(snapshot2.addedDataFiles(table.io()));
+
+    List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
+    Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
+
+    SparkActions actions = SparkActions.get();
+    RewriteManifests.Result result = actions.rewriteManifests(table).execute();
+    Assert.assertEquals(
+        "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests()));
+    Assert.assertEquals(
+        "Action should add 1 manifests", 1, Iterables.size(result.addedManifests()));
+
+    table.refresh();
+
+    List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
+    Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());
+
+    ManifestFile newManifest = Iterables.getOnlyElement(newManifests);
+    Assert.assertEquals(2, (long) newManifest.existingFilesCount());
+    Assert.assertFalse(newManifest.hasAddedFiles());
+    Assert.assertFalse(newManifest.hasDeletedFiles());
+
+    validateDataManifest(
+        table,
+        newManifest,
+        dataSeqs(1L, 2L),
+        fileSeqs(1L, 2L),
+        snapshotIds(snapshot1.snapshotId(), snapshot2.snapshotId()),
+        files(file1, file2));
+
+    List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+    expectedRecords.addAll(records1);
+    expectedRecords.addAll(records2);
+
+    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+    List<ThreeColumnRecord> actualRecords =
+        resultDF.sort("c1", "c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
+
+    Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+  }
+
   private void writeRecords(List<ThreeColumnRecord> records) {
     Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
     writeDF(df);