You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2022/12/01 17:35:50 UTC
[iceberg] branch master updated: Spark 2.4: Preserve file seq numbers while rewriting manifests (#6289)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new bf8e68b4a7 Spark 2.4: Preserve file seq numbers while rewriting manifests (#6289)
bf8e68b4a7 is described below
commit bf8e68b4a7b4cca187ab0b152da9d5e98d4a45d8
Author: Eduard Tudenhöfner <et...@gmail.com>
AuthorDate: Thu Dec 1 18:35:44 2022 +0100
Spark 2.4: Preserve file seq numbers while rewriting manifests (#6289)
---
.../actions/BaseRewriteManifestsSparkAction.java | 13 ++--
.../java/org/apache/iceberg/ValidationHelpers.java | 77 ++++++++++++++++++++++
.../spark/actions/TestRewriteManifestsAction.java | 68 +++++++++++++++++++
3 files changed, 154 insertions(+), 4 deletions(-)
diff --git a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
index c1b9537c5a..4cca7caf6e 100644
--- a/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
+++ b/spark/v2.4/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteManifestsSparkAction.java
@@ -197,12 +197,16 @@ public class BaseRewriteManifestsSparkAction
loadMetadataTable(table, ENTRIES)
.filter("status < 2") // select only live entries
.selectExpr(
- "input_file_name() as manifest", "snapshot_id", "sequence_number", "data_file");
+ "input_file_name() as manifest",
+ "snapshot_id",
+ "sequence_number",
+ "file_sequence_number",
+ "data_file");
Column joinCond = manifestDF.col("manifest").equalTo(manifestEntryDF.col("manifest"));
return manifestEntryDF
.join(manifestDF, joinCond, "left_semi")
- .select("snapshot_id", "sequence_number", "data_file");
+ .select("snapshot_id", "sequence_number", "file_sequence_number", "data_file");
}
private List<ManifestFile> writeManifestsForUnpartitionedTable(
@@ -355,8 +359,9 @@ public class BaseRewriteManifestsSparkAction
Row row = rows.get(index);
long snapshotId = row.getLong(0);
long sequenceNumber = row.getLong(1);
- Row file = row.getStruct(2);
- writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber);
+ Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2);
+ Row file = row.getStruct(3);
+ writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber, fileSequenceNumber);
}
} finally {
writer.close();
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/ValidationHelpers.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/ValidationHelpers.java
new file mode 100644
index 0000000000..70ab04f0a0
--- /dev/null
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/ValidationHelpers.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.assertj.core.api.Assertions;
+
+public class ValidationHelpers {
+
+ private ValidationHelpers() {}
+
+ public static List<Long> dataSeqs(Long... seqs) {
+ return Arrays.asList(seqs);
+ }
+
+ public static List<Long> fileSeqs(Long... seqs) {
+ return Arrays.asList(seqs);
+ }
+
+ public static List<Long> snapshotIds(Long... ids) {
+ return Arrays.asList(ids);
+ }
+
+ public static List<String> files(ContentFile<?>... files) {
+ return Arrays.stream(files).map(file -> file.path().toString()).collect(Collectors.toList());
+ }
+
+ public static void validateDataManifest(
+ Table table,
+ ManifestFile manifest,
+ List<Long> dataSeqs,
+ List<Long> fileSeqs,
+ List<Long> snapshotIds,
+ List<String> files) {
+
+ List<Long> actualDataSeqs = Lists.newArrayList();
+ List<Long> actualFileSeqs = Lists.newArrayList();
+ List<Long> actualSnapshotIds = Lists.newArrayList();
+ List<String> actualFiles = Lists.newArrayList();
+
+ for (ManifestEntry<DataFile> entry : ManifestFiles.read(manifest, table.io()).entries()) {
+ actualDataSeqs.add(entry.dataSequenceNumber());
+ actualFileSeqs.add(entry.fileSequenceNumber());
+ actualSnapshotIds.add(entry.snapshotId());
+ actualFiles.add(entry.file().path().toString());
+ }
+
+ assertSameElements("data seqs", actualDataSeqs, dataSeqs);
+ assertSameElements("file seqs", actualFileSeqs, fileSeqs);
+ assertSameElements("snapshot IDs", actualSnapshotIds, snapshotIds);
+ assertSameElements("files", actualFiles, files);
+ }
+
+ private static <T> void assertSameElements(String context, List<T> actual, List<T> expected) {
+ String errorMessage = String.format("%s must match", context);
+ Assertions.assertThat(actual).as(errorMessage).hasSameElementsAs(expected);
+ }
+}
diff --git a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 4b50ea0c29..95f2f12d5f 100644
--- a/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ b/spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -18,6 +18,11 @@
*/
package org.apache.iceberg.spark.actions;
+import static org.apache.iceberg.ValidationHelpers.dataSeqs;
+import static org.apache.iceberg.ValidationHelpers.fileSeqs;
+import static org.apache.iceberg.ValidationHelpers.files;
+import static org.apache.iceberg.ValidationHelpers.snapshotIds;
+import static org.apache.iceberg.ValidationHelpers.validateDataManifest;
import static org.apache.iceberg.types.Types.NestedField.optional;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
@@ -29,6 +34,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -38,6 +44,7 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -471,6 +478,67 @@ public class TestRewriteManifestsAction extends SparkTestBase {
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}
+ @Test
+ public void testRewriteSmallManifestsNonPartitionedV2Table() {
+ PartitionSpec spec = PartitionSpec.unpartitioned();
+ Map<String, String> properties = ImmutableMap.of(TableProperties.FORMAT_VERSION, "2");
+ Table table = TABLES.create(SCHEMA, spec, properties, tableLocation);
+
+ List<ThreeColumnRecord> records1 = Lists.newArrayList(new ThreeColumnRecord(1, null, "AAAA"));
+ writeRecords(records1);
+
+ table.refresh();
+
+ Snapshot snapshot1 = table.currentSnapshot();
+ DataFile file1 = Iterables.getOnlyElement(snapshot1.addedDataFiles(table.io()));
+
+ List<ThreeColumnRecord> records2 = Lists.newArrayList(new ThreeColumnRecord(2, "CCCC", "CCCC"));
+ writeRecords(records2);
+
+ table.refresh();
+
+ Snapshot snapshot2 = table.currentSnapshot();
+ DataFile file2 = Iterables.getOnlyElement(snapshot2.addedDataFiles(table.io()));
+
+ List<ManifestFile> manifests = table.currentSnapshot().allManifests(table.io());
+ Assert.assertEquals("Should have 2 manifests before rewrite", 2, manifests.size());
+
+ SparkActions actions = SparkActions.get();
+ RewriteManifests.Result result = actions.rewriteManifests(table).execute();
+ Assert.assertEquals(
+ "Action should rewrite 2 manifests", 2, Iterables.size(result.rewrittenManifests()));
+ Assert.assertEquals(
+ "Action should add 1 manifests", 1, Iterables.size(result.addedManifests()));
+
+ table.refresh();
+
+ List<ManifestFile> newManifests = table.currentSnapshot().allManifests(table.io());
+ Assert.assertEquals("Should have 1 manifests after rewrite", 1, newManifests.size());
+
+ ManifestFile newManifest = Iterables.getOnlyElement(newManifests);
+ Assert.assertEquals(2, (long) newManifest.existingFilesCount());
+ Assert.assertFalse(newManifest.hasAddedFiles());
+ Assert.assertFalse(newManifest.hasDeletedFiles());
+
+ validateDataManifest(
+ table,
+ newManifest,
+ dataSeqs(1L, 2L),
+ fileSeqs(1L, 2L),
+ snapshotIds(snapshot1.snapshotId(), snapshot2.snapshotId()),
+ files(file1, file2));
+
+ List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
+ expectedRecords.addAll(records1);
+ expectedRecords.addAll(records2);
+
+ Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
+ List<ThreeColumnRecord> actualRecords =
+ resultDF.sort("c1", "c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
+
+ Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
+ }
+
private void writeRecords(List<ThreeColumnRecord> records) {
Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
writeDF(df);