You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/02/17 20:55:46 UTC
[iceberg] branch master updated: Spark 3.1: Make manifest file names unique during imports (#6846)
This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new d67d06a89d Spark 3.1: Make manifest file names unique during imports (#6846)
d67d06a89d is described below
commit d67d06a89d0df8d96ac90b3add9db4b7645e467f
Author: Abid Mohammed <69...@users.noreply.github.com>
AuthorDate: Fri Feb 17 12:55:41 2023 -0800
Spark 3.1: Make manifest file names unique during imports (#6846)
Backports PR #6818 to Spark 3.1.
Co-authored-by: Abid Mohammed <ab...@apple.com>
---
.../spark/extensions/TestAddFilesProcedure.java | 42 ++++++++++++++++++++++
.../org/apache/iceberg/spark/SparkTableUtil.java | 5 ++-
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
index 046590cd0d..46963e42ff 100644
--- a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
+++ b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestAddFilesProcedure.java
@@ -24,6 +24,8 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
@@ -35,7 +37,9 @@ import org.apache.avro.io.DatumWriter;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Files;
+import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetricsConfig;
+import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.orc.GenericOrcWriter;
import org.apache.iceberg.io.FileAppender;
@@ -403,6 +407,44 @@ public class TestAddFilesProcedure extends SparkExtensionsTestBase {
sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName));
}
+ @Test
+ public void addPartitionToPartitionedSnapshotIdInheritanceEnabledInTwoRuns() {
+ createPartitionedFileTable("parquet");
+
+ String createIceberg =
+ "CREATE TABLE %s (id Integer, name String, dept String, subdept String) USING iceberg PARTITIONED BY (id)"
+ + "TBLPROPERTIES ('%s'='true')";
+
+ sql(createIceberg, tableName, TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED);
+
+ sql(
+ "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 1))",
+ catalogName, tableName, fileTableDir.getAbsolutePath());
+
+ sql(
+ "CALL %s.system.add_files('%s', '`parquet`.`%s`', map('id', 2))",
+ catalogName, tableName, fileTableDir.getAbsolutePath());
+
+ assertEquals(
+ "Iceberg table contains correct data",
+ sql("SELECT id, name, dept, subdept FROM %s WHERE id < 3 ORDER BY id", sourceTableName),
+ sql("SELECT id, name, dept, subdept FROM %s ORDER BY id", tableName));
+
+ // verify manifest file name has uuid pattern
+ List<Row> rows =
+ spark
+ .read()
+ .format("iceberg")
+ .load(String.format("%s.%s", tableName, MetadataTableType.MANIFESTS.name()))
+ .select("path")
+ .collectAsList();
+
+ Pattern uuidPattern = Pattern.compile("[a-f0-9]{8}(?:-[a-f0-9]{4}){4}[a-f0-9]{8}");
+
+ Matcher matcher = uuidPattern.matcher((String) rows.get(0).get(0));
+ Assert.assertTrue("verify manifest path has uuid", matcher.find());
+ }
+
@Test
public void addFilteredPartitionsToPartitioned() {
createCompositePartitionedTable("parquet");
diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index c0c1255d20..c1216f47ba 100644
--- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -378,7 +379,9 @@ public class SparkTableUtil {
FileIO io = new HadoopFileIO(conf.get());
TaskContext ctx = TaskContext.get();
String suffix =
- String.format("stage-%d-task-%d-manifest", ctx.stageId(), ctx.taskAttemptId());
+ String.format(
+ "stage-%d-task-%d-manifest-%s",
+ ctx.stageId(), ctx.taskAttemptId(), UUID.randomUUID());
Path location = new Path(basePath, suffix);
String outputPath = FileFormat.AVRO.addExtension(location.toString());
OutputFile outputFile = io.newOutputFile(outputPath);