You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by yu...@apache.org on 2023/05/26 17:30:39 UTC

[iceberg] branch master updated: Enable extra commit properties with metadata delete (#7649)

This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 893af4a198 Enable extra commit properties with metadata delete (#7649)
893af4a198 is described below

commit 893af4a19841ae23e18b1e2196df9176d9d90bc2
Author: Nan Zhu <Co...@users.noreply.github.com>
AuthorDate: Fri May 26 10:30:32 2023 -0700

    Enable extra commit properties with metadata delete (#7649)
---
 .../apache/iceberg/spark/source/SparkTable.java    | 18 +++++--
 .../spark/source/TestDataSourceOptions.java        | 55 ++++++++++++++++++----
 .../apache/iceberg/spark/source/SparkTable.java    |  5 +-
 .../spark/source/TestDataSourceOptions.java        | 55 ++++++++++++++++++----
 .../apache/iceberg/spark/source/SparkTable.java    |  5 ++
 .../spark/source/TestDataSourceOptions.java        | 55 ++++++++++++++++++----
 6 files changed, 157 insertions(+), 36 deletions(-)

diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index ff982fe96f..ec17bd0aac 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFiles;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.PartitionSpec;
@@ -48,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkReadOptions;
@@ -326,11 +328,17 @@ public class SparkTable
       return;
     }
 
-    icebergTable
-        .newDelete()
-        .set("spark.app.id", sparkSession().sparkContext().applicationId())
-        .deleteFromRowFilter(deleteExpr)
-        .commit();
+    DeleteFiles deleteFiles =
+        icebergTable
+            .newDelete()
+            .set("spark.app.id", sparkSession().sparkContext().applicationId())
+            .deleteFromRowFilter(deleteExpr);
+
+    if (!CommitMetadata.commitProperties().isEmpty()) {
+      CommitMetadata.commitProperties().forEach(deleteFiles::set);
+    }
+
+    deleteFiles.commit();
   }
 
   @Override
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 60dd716c63..06e4965a06 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.math.RoundingMode;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
@@ -40,18 +39,20 @@ import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.math.LongMath;
 import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -59,7 +60,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-public class TestDataSourceOptions {
+public class TestDataSourceOptions extends SparkTestBaseWithCatalog {
 
   private static final Configuration CONF = new Configuration();
   private static final Schema SCHEMA =
@@ -440,12 +441,46 @@ public class TestDataSourceOptions {
     writerThread.setName("test-extra-commit-message-writer-thread");
     writerThread.start();
     writerThread.join();
-    Set<String> threadNames = Sets.newHashSet();
-    for (Snapshot snapshot : table.snapshots()) {
-      threadNames.add(snapshot.summary().get("writer-thread"));
-    }
-    Assert.assertEquals(2, threadNames.size());
-    Assert.assertTrue(threadNames.contains(null));
-    Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+    Assert.assertEquals(2, snapshots.size());
+    Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+    Assert.assertEquals(
+        "test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread"));
+  }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    Thread writerThread =
+        new Thread(
+            () -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              CommitMetadata.withCommitProperties(
+                  properties,
+                  () -> {
+                    spark.sql("DELETE FROM " + tableName + " where id = 1");
+                    return 0;
+                  },
+                  RuntimeException.class);
+            });
+    writerThread.setName("test-extra-commit-message-delete-thread");
+    writerThread.start();
+    writerThread.join();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+    Assert.assertEquals(2, snapshots.size());
+    Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+    Assert.assertEquals(
+        "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread"));
   }
 }
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index d845284513..8fb583123c 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -52,6 +52,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkReadOptions;
@@ -374,7 +375,9 @@ public class SparkTable
     if (branch != null) {
       deleteFiles.toBranch(branch);
     }
-
+    if (!CommitMetadata.commitProperties().isEmpty()) {
+      CommitMetadata.commitProperties().forEach(deleteFiles::set);
+    }
     deleteFiles.commit();
   }
 
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 9f4eab5bb9..342d8085b1 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.math.RoundingMode;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
@@ -40,18 +39,20 @@ import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.math.LongMath;
 import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -59,7 +60,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-public class TestDataSourceOptions {
+public class TestDataSourceOptions extends SparkTestBaseWithCatalog {
 
   private static final Configuration CONF = new Configuration();
   private static final Schema SCHEMA =
@@ -440,12 +441,46 @@ public class TestDataSourceOptions {
     writerThread.setName("test-extra-commit-message-writer-thread");
     writerThread.start();
     writerThread.join();
-    Set<String> threadNames = Sets.newHashSet();
-    for (Snapshot snapshot : table.snapshots()) {
-      threadNames.add(snapshot.summary().get("writer-thread"));
-    }
-    Assert.assertEquals(2, threadNames.size());
-    Assert.assertTrue(threadNames.contains(null));
-    Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+    Assert.assertEquals(2, snapshots.size());
+    Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+    Assert.assertEquals(
+        "test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread"));
+  }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    Thread writerThread =
+        new Thread(
+            () -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              CommitMetadata.withCommitProperties(
+                  properties,
+                  () -> {
+                    spark.sql("DELETE FROM " + tableName + " where id = 1");
+                    return 0;
+                  },
+                  RuntimeException.class);
+            });
+    writerThread.setName("test-extra-commit-message-delete-thread");
+    writerThread.start();
+    writerThread.join();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+    Assert.assertEquals(2, snapshots.size());
+    Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+    Assert.assertEquals(
+        "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread"));
   }
 }
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index d845284513..240a9df0a8 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -52,6 +52,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkReadOptions;
@@ -375,6 +376,10 @@ public class SparkTable
       deleteFiles.toBranch(branch);
     }
 
+    if (!CommitMetadata.commitProperties().isEmpty()) {
+      CommitMetadata.commitProperties().forEach(deleteFiles::set);
+    }
+
     deleteFiles.commit();
   }
 
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index a14e7b500e..5e819200f5 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.math.RoundingMode;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
@@ -39,18 +38,20 @@ import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.relocated.com.google.common.math.LongMath;
 import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
 import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.assertj.core.api.Assertions;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -59,7 +60,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-public class TestDataSourceOptions {
+public class TestDataSourceOptions extends SparkTestBaseWithCatalog {
 
   private static final Configuration CONF = new Configuration();
   private static final Schema SCHEMA =
@@ -437,12 +438,46 @@ public class TestDataSourceOptions {
     writerThread.setName("test-extra-commit-message-writer-thread");
     writerThread.start();
     writerThread.join();
-    Set<String> threadNames = Sets.newHashSet();
-    for (Snapshot snapshot : table.snapshots()) {
-      threadNames.add(snapshot.summary().get("writer-thread"));
-    }
-    Assert.assertEquals(2, threadNames.size());
-    Assert.assertTrue(threadNames.contains(null));
-    Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+    Assert.assertEquals(2, snapshots.size());
+    Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+    Assert.assertEquals(
+        "test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread"));
+  }
+
+  @Test
+  public void testExtraSnapshotMetadataWithDelete()
+      throws InterruptedException, NoSuchTableException {
+    spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    List<SimpleRecord> expectedRecords =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+    Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+    originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+    Thread writerThread =
+        new Thread(
+            () -> {
+              Map<String, String> properties = Maps.newHashMap();
+              properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+              CommitMetadata.withCommitProperties(
+                  properties,
+                  () -> {
+                    spark.sql("DELETE FROM " + tableName + " where id = 1");
+                    return 0;
+                  },
+                  RuntimeException.class);
+            });
+    writerThread.setName("test-extra-commit-message-delete-thread");
+    writerThread.start();
+    writerThread.join();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+    Assert.assertEquals(2, snapshots.size());
+    Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+    Assert.assertEquals(
+        "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread"));
   }
 }