You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/02/16 05:07:14 UTC

[iceberg] branch master updated: Spark 3.2: Make manifest file names unique during imports (#6845)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c4fd98e15 Spark 3.2: Make manifest file names unique during imports (#6845)
8c4fd98e15 is described below

commit 8c4fd98e15756bd1cbd913b7366bf31fa84c53ec
Author: Abid Mohammed <69...@users.noreply.github.com>
AuthorDate: Wed Feb 15 21:07:09 2023 -0800

    Spark 3.2: Make manifest file names unique during imports (#6845)
    
    Backports PR #6818 to Spark 3.2.
---
 .../spark/extensions/TestAddFilesProcedure.java    | 35 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkTableUtil.java   |  5 +++-
 2 files changed, 39 insertions(+), 1 deletion(-)

diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
index f689401653..5707ecbe75 100644
--- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
+++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
@@ -22,6 +22,8 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
@@ -32,6 +34,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumWriter;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.spark.sql.Dataset;
@@ -394,6 +397,38 @@ public class TestAddFilesProcedure extends SparkExtensionsTestBase {
         sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName));
   }
 
+  @Test
+  public void addPartitionToPartitionedSnapshotIdInheritanceEnabledInTwoRuns() {
+    createPartitionedFileTable("parquet");
+
+    String createIceberg =
+        "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"
+            + "TBLPROPERTIES ('%s'='true')";
+
+    sql(createIceberg, tableName, TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED);
+
+    sql(
+        "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))",
+        catalogName, tableName, fileTableDir.getAbsolutePath());
+
+    sql(
+        "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 2))",
+        catalogName, tableName, fileTableDir.getAbsolutePath());
+
+    assertEquals(
+        "Iceberg table contains correct data",
+        sql("SELECT id, name, dept, subdept FROM %s WHERE id < 3 ORDER BY id", sourceTableName),
+        sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName));
+
+    // verify manifest file name has uuid pattern
+    String manifestPath = (String) sql("select path from %s.manifests", tableName).get(0)[0];
+
+    Pattern uuidPattern = Pattern.compile("[a-f0-9]{8}(?:-[a-f0-9]{4}){4}[a-f0-9]{8}");
+
+    Matcher matcher = uuidPattern.matcher(manifestPath);
+    Assert.assertTrue("verify manifest path has uuid", matcher.find());
+  }
+
   @Test
   public void addDataPartitionedByDateToPartitioned() {
     createDatePartitionedFileTable("parquet");
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 3b7e063d99..4bfff5b2c4 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -372,7 +373,9 @@ public class SparkTableUtil {
       FileIO io = new HadoopFileIO(conf.get());
       TaskContext ctx = TaskContext.get();
       String suffix =
-          String.format("stage-%d-task-%d-manifest", ctx.stageId(), ctx.taskAttemptId());
+          String.format(
+              "stage-%d-task-%d-manifest-%s",
+              ctx.stageId(), ctx.taskAttemptId(), UUID.randomUUID());
       Path location = new Path(basePath, suffix);
       String outputPath = FileFormat.AVRO.addExtension(location.toString());
       OutputFile outputFile = io.newOutputFile(outputPath);