You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by yu...@apache.org on 2023/05/26 17:30:39 UTC
[iceberg] branch master updated: Enable extra commit properties with metadata delete (#7649)
This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 893af4a198 Enable extra commit properties with metadata delete (#7649)
893af4a198 is described below
commit 893af4a19841ae23e18b1e2196df9176d9d90bc2
Author: Nan Zhu <Co...@users.noreply.github.com>
AuthorDate: Fri May 26 10:30:32 2023 -0700
Enable extra commit properties with metadata delete (#7649)
---
.../apache/iceberg/spark/source/SparkTable.java | 18 +++++--
.../spark/source/TestDataSourceOptions.java | 55 ++++++++++++++++++----
.../apache/iceberg/spark/source/SparkTable.java | 5 +-
.../spark/source/TestDataSourceOptions.java | 55 ++++++++++++++++++----
.../apache/iceberg/spark/source/SparkTable.java | 5 ++
.../spark/source/TestDataSourceOptions.java | 55 ++++++++++++++++++----
6 files changed, 157 insertions(+), 36 deletions(-)
diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index ff982fe96f..ec17bd0aac 100644
--- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -27,6 +27,7 @@ import java.util.Set;
import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionSpec;
@@ -48,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
@@ -326,11 +328,17 @@ public class SparkTable
return;
}
- icebergTable
- .newDelete()
- .set("spark.app.id", sparkSession().sparkContext().applicationId())
- .deleteFromRowFilter(deleteExpr)
- .commit();
+ DeleteFiles deleteFiles =
+ icebergTable
+ .newDelete()
+ .set("spark.app.id", sparkSession().sparkContext().applicationId())
+ .deleteFromRowFilter(deleteExpr);
+
+ if (!CommitMetadata.commitProperties().isEmpty()) {
+ CommitMetadata.commitProperties().forEach(deleteFiles::set);
+ }
+
+ deleteFiles.commit();
}
@Override
diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 60dd716c63..06e4965a06 100644
--- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
@@ -40,18 +39,20 @@ import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -59,7 +60,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-public class TestDataSourceOptions {
+public class TestDataSourceOptions extends SparkTestBaseWithCatalog {
private static final Configuration CONF = new Configuration();
private static final Schema SCHEMA =
@@ -440,12 +441,46 @@ public class TestDataSourceOptions {
writerThread.setName("test-extra-commit-message-writer-thread");
writerThread.start();
writerThread.join();
- Set<String> threadNames = Sets.newHashSet();
- for (Snapshot snapshot : table.snapshots()) {
- threadNames.add(snapshot.summary().get("writer-thread"));
- }
- Assert.assertEquals(2, threadNames.size());
- Assert.assertTrue(threadNames.contains(null));
- Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+ Assert.assertEquals(2, snapshots.size());
+ Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+ Assert.assertEquals(
+ "test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread"));
+ }
+
+ @Test
+ public void testExtraSnapshotMetadataWithDelete()
+ throws InterruptedException, NoSuchTableException {
+ spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+ sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+ List<SimpleRecord> expectedRecords =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+ Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+ originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+ Thread writerThread =
+ new Thread(
+ () -> {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+ CommitMetadata.withCommitProperties(
+ properties,
+ () -> {
+ spark.sql("DELETE FROM " + tableName + " where id = 1");
+ return 0;
+ },
+ RuntimeException.class);
+ });
+ writerThread.setName("test-extra-commit-message-delete-thread");
+ writerThread.start();
+ writerThread.join();
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+ Assert.assertEquals(2, snapshots.size());
+ Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+ Assert.assertEquals(
+ "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread"));
}
}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index d845284513..8fb583123c 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -52,6 +52,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
@@ -374,7 +375,9 @@ public class SparkTable
if (branch != null) {
deleteFiles.toBranch(branch);
}
-
+ if (!CommitMetadata.commitProperties().isEmpty()) {
+ CommitMetadata.commitProperties().forEach(deleteFiles::set);
+ }
deleteFiles.commit();
}
diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index 9f4eab5bb9..342d8085b1 100644
--- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.DataFile;
@@ -40,18 +39,20 @@ import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -59,7 +60,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-public class TestDataSourceOptions {
+public class TestDataSourceOptions extends SparkTestBaseWithCatalog {
private static final Configuration CONF = new Configuration();
private static final Schema SCHEMA =
@@ -440,12 +441,46 @@ public class TestDataSourceOptions {
writerThread.setName("test-extra-commit-message-writer-thread");
writerThread.start();
writerThread.join();
- Set<String> threadNames = Sets.newHashSet();
- for (Snapshot snapshot : table.snapshots()) {
- threadNames.add(snapshot.summary().get("writer-thread"));
- }
- Assert.assertEquals(2, threadNames.size());
- Assert.assertTrue(threadNames.contains(null));
- Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+ Assert.assertEquals(2, snapshots.size());
+ Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+ Assert.assertEquals(
+ "test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread"));
+ }
+
+ @Test
+ public void testExtraSnapshotMetadataWithDelete()
+ throws InterruptedException, NoSuchTableException {
+ spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+ sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+ List<SimpleRecord> expectedRecords =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+ Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+ originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+ Thread writerThread =
+ new Thread(
+ () -> {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+ CommitMetadata.withCommitProperties(
+ properties,
+ () -> {
+ spark.sql("DELETE FROM " + tableName + " where id = 1");
+ return 0;
+ },
+ RuntimeException.class);
+ });
+ writerThread.setName("test-extra-commit-message-delete-thread");
+ writerThread.start();
+ writerThread.join();
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+ Assert.assertEquals(2, snapshots.size());
+ Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+ Assert.assertEquals(
+ "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread"));
}
}
diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index d845284513..240a9df0a8 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -52,6 +52,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
@@ -375,6 +376,10 @@ public class SparkTable
deleteFiles.toBranch(branch);
}
+ if (!CommitMetadata.commitProperties().isEmpty()) {
+ CommitMetadata.commitProperties().forEach(deleteFiles::set);
+ }
+
deleteFiles.commit();
}
diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
index a14e7b500e..5e819200f5 100644
--- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java
@@ -24,7 +24,6 @@ import java.io.IOException;
import java.math.RoundingMode;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
@@ -39,18 +38,20 @@ import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -59,7 +60,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
-public class TestDataSourceOptions {
+public class TestDataSourceOptions extends SparkTestBaseWithCatalog {
private static final Configuration CONF = new Configuration();
private static final Schema SCHEMA =
@@ -437,12 +438,46 @@ public class TestDataSourceOptions {
writerThread.setName("test-extra-commit-message-writer-thread");
writerThread.start();
writerThread.join();
- Set<String> threadNames = Sets.newHashSet();
- for (Snapshot snapshot : table.snapshots()) {
- threadNames.add(snapshot.summary().get("writer-thread"));
- }
- Assert.assertEquals(2, threadNames.size());
- Assert.assertTrue(threadNames.contains(null));
- Assert.assertTrue(threadNames.contains("test-extra-commit-message-writer-thread"));
+
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+ Assert.assertEquals(2, snapshots.size());
+ Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+ Assert.assertEquals(
+ "test-extra-commit-message-writer-thread", snapshots.get(1).summary().get("writer-thread"));
+ }
+
+ @Test
+ public void testExtraSnapshotMetadataWithDelete()
+ throws InterruptedException, NoSuchTableException {
+ spark.sessionState().conf().setConfString("spark.sql.shuffle.partitions", "1");
+ sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+ List<SimpleRecord> expectedRecords =
+ Lists.newArrayList(
+ new SimpleRecord(1, "a"), new SimpleRecord(2, "b"), new SimpleRecord(3, "c"));
+ Dataset<Row> originalDf = spark.createDataFrame(expectedRecords, SimpleRecord.class);
+ originalDf.repartition(5, new Column("data")).select("id", "data").writeTo(tableName).append();
+ Thread writerThread =
+ new Thread(
+ () -> {
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put("writer-thread", String.valueOf(Thread.currentThread().getName()));
+ CommitMetadata.withCommitProperties(
+ properties,
+ () -> {
+ spark.sql("DELETE FROM " + tableName + " where id = 1");
+ return 0;
+ },
+ RuntimeException.class);
+ });
+ writerThread.setName("test-extra-commit-message-delete-thread");
+ writerThread.start();
+ writerThread.join();
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ List<Snapshot> snapshots = Lists.newArrayList(table.snapshots());
+ Assert.assertEquals(2, snapshots.size());
+ Assert.assertNull(snapshots.get(0).summary().get("writer-thread"));
+ Assert.assertEquals(
+ "test-extra-commit-message-delete-thread", snapshots.get(1).summary().get("writer-thread"));
}
}