You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iceberg.apache.org by ao...@apache.org on 2023/05/23 05:21:13 UTC

[iceberg] branch master updated: Spark 3.3: Add RewritePositionDeleteFilesProcedure (#7687)

This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 711780b6cc Spark 3.3: Add RewritePositionDeleteFilesProcedure (#7687)
711780b6cc is described below

commit 711780b6cc835f3401cf8570072cc44ac03047cc
Author: Szehon Ho <sz...@gmail.com>
AuthorDate: Mon May 22 22:21:07 2023 -0700

    Spark 3.3: Add RewritePositionDeleteFilesProcedure (#7687)
    
    This commit backports PR #7572 to Spark 3.3.
---
 .../TestRewritePositionDeleteFilesProcedure.java   | 152 +++++++++++++++++++++
 .../RewritePositionDeleteFilesProcedure.java       | 109 +++++++++++++++
 .../iceberg/spark/procedures/SparkProcedures.java  |   1 +
 3 files changed, 262 insertions(+)

diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
new file mode 100644
index 0000000000..88715fd89c
--- /dev/null
+++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.SnapshotSummary.ADDED_FILE_SIZE_PROP;
+import static org.apache.iceberg.SnapshotSummary.REMOVED_FILE_SIZE_PROP;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Encoders;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRewritePositionDeleteFilesProcedure extends SparkExtensionsTestBase {
+
+  public TestRewritePositionDeleteFilesProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  private void createTable() throws Exception {
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+        tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "c"),
+            new SimpleRecord(4, "d"),
+            new SimpleRecord(5, "e"),
+            new SimpleRecord(6, "f"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testExpireDeleteFilesAll() throws Exception {
+    createTable();
+
+    sql("DELETE FROM %s WHERE id=1", tableName);
+    sql("DELETE FROM %s WHERE id=2", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals(2, TestHelpers.deleteFiles(table).size());
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_position_delete_files("
+                + "table => '%s',"
+                + "options => map("
+                + "'rewrite-all','true'))",
+            catalogName, tableIdent);
+    table.refresh();
+
+    Map<String, String> snapshotSummary = snapshotSummary();
+    assertEquals(
+        "Should delete 2 delete files and add 1",
+        ImmutableList.of(
+            row(
+                2,
+                1,
+                Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
+                Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
+        output);
+
+    Assert.assertEquals(1, TestHelpers.deleteFiles(table).size());
+  }
+
+  @Test
+  public void testExpireDeleteFilesNoOption() throws Exception {
+    createTable();
+
+    sql("DELETE FROM %s WHERE id=1", tableName);
+    sql("DELETE FROM %s WHERE id=2", tableName);
+    sql("DELETE FROM %s WHERE id=3", tableName);
+    sql("DELETE FROM %s WHERE id=4", tableName);
+    sql("DELETE FROM %s WHERE id=5", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals(5, TestHelpers.deleteFiles(table).size());
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_position_delete_files(" + "table => '%s')",
+            catalogName, tableIdent);
+    table.refresh();
+
+    Map<String, String> snapshotSummary = snapshotSummary();
+    assertEquals(
+        "Should replace 5 delete files with 1",
+        ImmutableList.of(
+            row(
+                5,
+                1,
+                Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
+                Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
+        output);
+  }
+
+  @Test
+  public void testInvalidOption() throws Exception {
+    createTable();
+
+    Assert.assertThrows(
+        "Cannot use options [foo], they are not supported by the action or the rewriter BIN-PACK",
+        IllegalArgumentException.class,
+        () ->
+            sql(
+                "CALL %s.system.rewrite_position_delete_files("
+                    + "table => '%s',"
+                    + "options => map("
+                    + "'foo', 'bar'))",
+                catalogName, tableIdent));
+  }
+
+  private Map<String, String> snapshotSummary() {
+    return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
+  }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
new file mode 100644
index 0000000000..a4a3f63ba7
--- /dev/null
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A procedure that rewrites position delete files in a table.
+ *
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewritePositionDeletes(Table)
+ */
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter TABLE_PARAM =
+      ProcedureParameter.required("table", DataTypes.StringType);
+  private static final ProcedureParameter OPTIONS_PARAM =
+      ProcedureParameter.optional("options", STRING_MAP);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField(
+                "rewritten_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField(
+                "added_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
+            new StructField("added_bytes_count", DataTypes.LongType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new Builder<RewritePositionDeleteFilesProcedure>() {
+      @Override
+      protected RewritePositionDeleteFilesProcedure doBuild() {
+        return new RewritePositionDeleteFilesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private RewritePositionDeleteFilesProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    Map<String, String> options = input.asStringMap(OPTIONS_PARAM, ImmutableMap.of());
+
+    return modifyIcebergTable(
+        tableIdent,
+        table -> {
+          Result result = actions().rewritePositionDeletes(table).options(options).execute();
+          return new InternalRow[] {toOutputRow(result)};
+        });
+  }
+
+  private InternalRow toOutputRow(Result result) {
+    return newInternalRow(
+        result.rewrittenDeleteFilesCount(),
+        result.addedDeleteFilesCount(),
+        result.rewrittenBytesCount(),
+        result.addedBytesCount());
+  }
+
+  @Override
+  public String description() {
+    return "RewritePositionDeleteFilesProcedure";
+  }
+}
diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 8ee3a95501..7ebbb46c3d 100644
--- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -54,6 +54,7 @@ public class SparkProcedures {
     mapBuilder.put("register_table", RegisterTableProcedure::builder);
     mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
     mapBuilder.put("create_changelog_view", CreateChangelogViewProcedure::builder);
+    mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder);
     return mapBuilder.build();
   }