You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/02/17 20:55:46 UTC

[iceberg] branch master updated: Spark 3.1: Make manifest file names unique during imports (#6846)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new d67d06a89d Spark 3.1: Make manifest file names unique during imports (#6846)
d67d06a89d is described below

commit d67d06a89d0df8d96ac90b3add9db4b7645e467f
Author: Abid Mohammed <69...@users.noreply.github.com>
AuthorDate: Fri Feb 17 12:55:41 2023 -0800

    Spark 3.1: Make manifest file names unique during imports (#6846)
    
    Backports PR #6818 to Spark 3.1.
    
    Co-authored-by: Abid Mohammed <ab...@apple.com>
---
 .../spark/extensions/TestAddFilesProcedure.java    | 42 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkTableUtil.java   |  5 ++-
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
index 046590cd0d..46963e42ff 100644
--- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
+++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
@@ -24,6 +24,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
@@ -35,7 +37,9 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataTableType;
 import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.data.orc.GenericOrcWriter;
 import org.apache.iceberg.io.FileAppender;
@@ -403,6 +407,44 @@ public class TestAddFilesProcedure extends SparkExtensionsTestBase {
         sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName));
   }
 
+  @Test
+  public void addPartitionToPartitionedSnapshotIdInheritanceEnabledInTwoRuns() {
+    createPartitionedFileTable("parquet");
+
+    String createIceberg =
+        "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"
+            + "TBLPROPERTIES ('%s'='true')";
+
+    sql(createIceberg, tableName, TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED);
+
+    sql(
+        "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))",
+        catalogName, tableName, fileTableDir.getAbsolutePath());
+
+    sql(
+        "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 2))",
+        catalogName, tableName, fileTableDir.getAbsolutePath());
+
+    assertEquals(
+        "Iceberg table contains correct data",
+        sql("SELECT id, name, dept, subdept FROM %s WHERE id < 3 ORDER BY id", sourceTableName),
+        sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName));
+
+    // verify manifest file name has uuid pattern
+    List<Row> rows =
+        spark
+            .read()
+            .format("iceberg")
+            .load(String.format("%s.%s", tableName, MetadataTableType.MANIFESTS.name()))
+            .select("path")
+            .collectAsList();
+
+    Pattern uuidPattern = Pattern.compile("[a-f0-9]{8}(?:-[a-f0-9]{4}){4}[a-f0-9]{8}");
+
+    Matcher matcher = uuidPattern.matcher((String) rows.get(0).get(0));
+    Assert.assertTrue("verify manifest path has uuid", matcher.find());
+  }
+
   @Test
   public void addFilteredPartitionsToPartitioned() {
     createCompositePartitionedTable("parquet");
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index c0c1255d20..c1216f47ba 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -378,7 +379,9 @@ public class SparkTableUtil {
       FileIO io = new HadoopFileIO(conf.get());
       TaskContext ctx = TaskContext.get();
       String suffix =
-          String.format("stage-%d-task-%d-manifest", ctx.stageId(), ctx.taskAttemptId());
+          String.format(
+              "stage-%d-task-%d-manifest-%s",
+              ctx.stageId(), ctx.taskAttemptId(), UUID.randomUUID());
       Path location = new Path(basePath, suffix);
       String outputPath = FileFormat.AVRO.addExtension(location.toString());
       OutputFile outputFile = io.newOutputFile(outputPath);