You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "szehon-ho (via GitHub)" <gi...@apache.org> on 2023/05/09 21:05:14 UTC

[GitHub] [iceberg] szehon-ho opened a new pull request, #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

szehon-ho opened a new pull request, #7572:
URL: https://github.com/apache/iceberg/pull/7572

   Adds a spark procedure: rewrite_position_delete_files for #7389 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi merged pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi merged PR #7572:
URL: https://github.com/apache/iceberg/pull/7572


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1194377787


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter TABLE_PARAM =
+      ProcedureParameter.required("table", DataTypes.StringType);
+  private static final ProcedureParameter OPTIONS_PARAM =
+      ProcedureParameter.optional("options", STRING_MAP);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+
+  // counts are not nullable since the action result is never null

Review Comment:
   Removed



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter TABLE_PARAM =
+      ProcedureParameter.required("table", DataTypes.StringType);
+  private static final ProcedureParameter OPTIONS_PARAM =
+      ProcedureParameter.optional("options", STRING_MAP);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+
+  // counts are not nullable since the action result is never null
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField(
+                "rewritten_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField(
+                "added_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
+            new StructField("added_bytes_count", DataTypes.LongType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new Builder<RewritePositionDeleteFilesProcedure>() {
+      @Override
+      protected RewritePositionDeleteFilesProcedure doBuild() {
+        return new RewritePositionDeleteFilesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private RewritePositionDeleteFilesProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
+
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+
+    Map<String, String> options = input.asStringMap(OPTIONS_PARAM, ImmutableMap.of());

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1194625864


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A procedure that rewrites position delete files in a table.
+ *
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewritePositionDeletes(Table)
+ */
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter TABLE_PARAM =
+      ProcedureParameter.required("table", DataTypes.StringType);
+  private static final ProcedureParameter OPTIONS_PARAM =
+      ProcedureParameter.optional("options", STRING_MAP);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField(
+                "rewritten_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField(
+                "added_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
+            new StructField("added_bytes_count", DataTypes.LongType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new Builder<RewritePositionDeleteFilesProcedure>() {
+      @Override
+      protected RewritePositionDeleteFilesProcedure doBuild() {
+        return new RewritePositionDeleteFilesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private RewritePositionDeleteFilesProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    Map<String, String> options = input.asStringMap(OPTIONS_PARAM, ImmutableMap.of());
+
+    return modifyIcebergTable(
+        tableIdent,
+        table -> {
+          RewritePositionDeleteFiles action =
+              actions().rewritePositionDeletes(table).options(options);
+          Result result = action.execute();
+
+          return toOutputRows(result);
+        });
+  }
+
+  private InternalRow[] toOutputRows(Result result) {

Review Comment:
   Optional: Another way to write it.
   
   ```
   private InternalRow toOutputRow(Result result) {
     return newInternalRow(
         result.rewrittenDeleteFilesCount(),
         result.rewrittenBytesCount(),
         result.addedDeleteFilesCount(),
         result.addedBytesCount());
   }
   ```
   
   ```
   ...
   return new InternalRow[] {toOutputRow(result)};
   ```
   
   Up to you, though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1190491402


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.SnapshotSummary.ADDED_FILE_SIZE_PROP;
+import static org.apache.iceberg.SnapshotSummary.REMOVED_FILE_SIZE_PROP;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Encoders;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRewritePositionDeleteFilesProcedure extends SparkExtensionsTestBase {
+
+  public TestRewritePositionDeleteFilesProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  private void createTable() throws Exception {
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+        tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "c"),
+            new SimpleRecord(4, "d"),
+            new SimpleRecord(5, "e"),
+            new SimpleRecord(6, "f"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testExpireDeleteFilesAll() throws Exception {
+    createTable();
+
+    sql("DELETE FROM %s WHERE id=1", tableName);
+    sql("DELETE FROM %s WHERE id=2", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals(2, TestHelpers.deleteFiles(table).size());
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_position_delete_files("
+                + "table => '%s',"
+                + "options => map("
+                + "'rewrite-all','true'))",
+            catalogName, tableIdent);
+    table.refresh();
+
+    Map<String, String> snapshotSummary = snapshotSummary();
+    assertEquals(
+        "Should delete 2 delete files and add 1",
+        ImmutableList.of(
+            row(
+                2,
+                1,
+                Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
+                Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
+        output);
+
+    Assert.assertEquals(1, TestHelpers.deleteFiles(table).size());

Review Comment:
   I wanted this test to be about the procedure code, as I already have added delete file content check in the test of of Action itself in TestRewritePositionDeleteFilesAction 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#issuecomment-1549007416

   Let me take a look as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#issuecomment-1551647951

   Thanks everyone!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1195790530


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A procedure that rewrites position delete files in a table.
+ *
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewritePositionDeletes(Table)
+ */
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter TABLE_PARAM =
+      ProcedureParameter.required("table", DataTypes.StringType);
+  private static final ProcedureParameter OPTIONS_PARAM =
+      ProcedureParameter.optional("options", STRING_MAP);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField(
+                "rewritten_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField(
+                "added_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
+            new StructField("added_bytes_count", DataTypes.LongType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new Builder<RewritePositionDeleteFilesProcedure>() {
+      @Override
+      protected RewritePositionDeleteFilesProcedure doBuild() {
+        return new RewritePositionDeleteFilesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private RewritePositionDeleteFilesProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    Map<String, String> options = input.asStringMap(OPTIONS_PARAM, ImmutableMap.of());
+
+    return modifyIcebergTable(
+        tableIdent,
+        table -> {
+          RewritePositionDeleteFiles action =
+              actions().rewritePositionDeletes(table).options(options);
+          Result result = action.execute();
+
+          return toOutputRows(result);
+        });
+  }
+
+  private InternalRow[] toOutputRows(Result result) {

Review Comment:
   Yep, figured out your argument is in the wrong order, but yep, done.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1190491051


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java:
##########
@@ -54,6 +54,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
     mapBuilder.put("register_table", RegisterTableProcedure::builder);
     mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
     mapBuilder.put("create_changelog_view", CreateChangelogViewProcedure::builder);
+    mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder);

Review Comment:
   What does everyone think?  I use RewritePositionDeletes and PositionDeletes in the code a lot for shortness, but was not sure as the procedure names all indicate files



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1194378271


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java:
##########
@@ -54,6 +54,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
     mapBuilder.put("register_table", RegisterTableProcedure::builder);
     mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
     mapBuilder.put("create_changelog_view", CreateChangelogViewProcedure::builder);
+    mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder);

Review Comment:
   Yea thats the primary reason why i kept 'files' though i do think it is long, @aokolnychyi what do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1194420542


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.SnapshotSummary.ADDED_FILE_SIZE_PROP;
+import static org.apache.iceberg.SnapshotSummary.REMOVED_FILE_SIZE_PROP;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Encoders;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRewritePositionDeleteFilesProcedure extends SparkExtensionsTestBase {
+
+  public TestRewritePositionDeleteFilesProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  private void createTable() throws Exception {
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+        tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "c"),
+            new SimpleRecord(4, "d"),
+            new SimpleRecord(5, "e"),
+            new SimpleRecord(6, "f"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testExpireDeleteFilesAll() throws Exception {
+    createTable();

Review Comment:
   Sure that makes sense! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1194625029


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java:
##########
@@ -54,6 +54,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
     mapBuilder.put("register_table", RegisterTableProcedure::builder);
     mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
     mapBuilder.put("create_changelog_view", CreateChangelogViewProcedure::builder);
+    mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder);

Review Comment:
   No strong opinion. I also like shorter names but it seems we use `files` suffix in other procedures so I'll be inclined to keep the name as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] amogh-jahagirdar commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "amogh-jahagirdar (via GitHub)" <gi...@apache.org>.
amogh-jahagirdar commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1191754194


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter TABLE_PARAM =
+      ProcedureParameter.required("table", DataTypes.StringType);
+  private static final ProcedureParameter OPTIONS_PARAM =
+      ProcedureParameter.optional("options", STRING_MAP);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+
+  // counts are not nullable since the action result is never null

Review Comment:
   Think we can remove this comment but up to you



##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.SnapshotSummary.ADDED_FILE_SIZE_PROP;
+import static org.apache.iceberg.SnapshotSummary.REMOVED_FILE_SIZE_PROP;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Encoders;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRewritePositionDeleteFilesProcedure extends SparkExtensionsTestBase {
+
+  public TestRewritePositionDeleteFilesProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  private void createTable() throws Exception {
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+        tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "c"),
+            new SimpleRecord(4, "d"),
+            new SimpleRecord(5, "e"),
+            new SimpleRecord(6, "f"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testExpireDeleteFilesAll() throws Exception {
+    createTable();

Review Comment:
   Can we move the `createTable()` to a "Before"?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {

Review Comment:
   I know it's somewhat self explanatory but would we add a class level javadoc similar to `RewriteDataFilesProcedure` linking to the action?



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter TABLE_PARAM =
+      ProcedureParameter.required("table", DataTypes.StringType);
+  private static final ProcedureParameter OPTIONS_PARAM =
+      ProcedureParameter.optional("options", STRING_MAP);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+
+  // counts are not nullable since the action result is never null
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField(
+                "rewritten_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField(
+                "added_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
+            new StructField("added_bytes_count", DataTypes.LongType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new Builder<RewritePositionDeleteFilesProcedure>() {
+      @Override
+      protected RewritePositionDeleteFilesProcedure doBuild() {
+        return new RewritePositionDeleteFilesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private RewritePositionDeleteFilesProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
+
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+
+    Map<String, String> options = input.asStringMap(OPTIONS_PARAM, ImmutableMap.of());

Review Comment:
   Nit: I think the new lines at 82 and 84 here are a bit unnecessary (think the one before modifyIcebergTable call makes sense though)



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java:
##########
@@ -54,6 +54,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
     mapBuilder.put("register_table", RegisterTableProcedure::builder);
     mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
     mapBuilder.put("create_changelog_view", CreateChangelogViewProcedure::builder);
+    mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder);

Review Comment:
   Normally I'd go for shorter names, but I see here all the keys in the map already have `files` suffix so I think it's fine as it is currently 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "ajantha-bhat (via GitHub)" <gi...@apache.org>.
ajantha-bhat commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1189378280


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.SnapshotSummary.ADDED_FILE_SIZE_PROP;
+import static org.apache.iceberg.SnapshotSummary.REMOVED_FILE_SIZE_PROP;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Encoders;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRewritePositionDeleteFilesProcedure extends SparkExtensionsTestBase {
+
+  public TestRewritePositionDeleteFilesProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  private void createTable() throws Exception {
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+        tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "c"),
+            new SimpleRecord(4, "d"),
+            new SimpleRecord(5, "e"),
+            new SimpleRecord(6, "f"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testExpireDeleteFilesAll() throws Exception {
+    createTable();
+
+    sql("DELETE FROM %s WHERE id=1", tableName);
+    sql("DELETE FROM %s WHERE id=2", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals(2, TestHelpers.deleteFiles(table).size());
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_position_delete_files("
+                + "table => '%s',"
+                + "options => map("
+                + "'rewrite-all','true'))",
+            catalogName, tableIdent);
+    table.refresh();
+
+    Map<String, String> snapshotSummary = snapshotSummary();
+    assertEquals(
+        "Should delete 2 delete files and add 1",
+        ImmutableList.of(
+            row(
+                2,
+                1,
+                Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
+                Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
+        output);
+
+    Assert.assertEquals(1, TestHelpers.deleteFiles(table).size());
+  }
+
+  @Test
+  public void testExpireDeleteFilesNoOption() throws Exception {
+    createTable();
+
+    sql("DELETE FROM %s WHERE id=1", tableName);
+    sql("DELETE FROM %s WHERE id=2", tableName);
+    sql("DELETE FROM %s WHERE id=3", tableName);
+    sql("DELETE FROM %s WHERE id=4", tableName);
+    sql("DELETE FROM %s WHERE id=5", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals(5, TestHelpers.deleteFiles(table).size());
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_position_delete_files(" + "table => '%s')",
+            catalogName, tableIdent);
+    table.refresh();
+
+    Map<String, String> snapshotSummary = snapshotSummary();
+    assertEquals(
+        "Should replace 5 delete files with 1",
+        ImmutableList.of(
+            row(
+                5,
+                1,
+                Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
+                Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
+        output);
+  }
+
+  @Test
+  public void testInvalidOption() throws Exception {
+    createTable();
+
+    Assert.assertThrows(
+        "Cannot use options [foo], they are not supported by the action or the rewriter BIN-PACK",
+        IllegalArgumentException.class,
+        () ->
+            sql(
+                "CALL %s.system.rewrite_position_delete_files("
+                    + "table => '%s',"
+                    + "options => map("
+                    + "'foo', 'bar'))",
+                catalogName, tableIdent));
+  }
+
+  private Map<String, String> snapshotSummary() {
+    return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
+  }
+}

Review Comment:
   can we also add a test case with dangling deletes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1190489954


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter TABLE_PARAM =
+      ProcedureParameter.required("table", DataTypes.StringType);
+  private static final ProcedureParameter OPTIONS_PARAM =
+      ProcedureParameter.optional("options", STRING_MAP);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+
+  // counts are not nullable since the action result is never null
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField(
+                "rewritten_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField(
+                "added_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
+            new StructField("added_bytes_count", DataTypes.LongType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new Builder<RewritePositionDeleteFilesProcedure>() {
+      @Override
+      protected RewritePositionDeleteFilesProcedure doBuild() {
+        return new RewritePositionDeleteFilesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private RewritePositionDeleteFilesProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
+
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+
+    Map<String, String> options = input.asStringMap(OPTIONS_PARAM, ImmutableMap.of());
+
+    return modifyIcebergTable(
+        tableIdent,
+        table -> {
+          RewritePositionDeleteFiles action =
+              actions().rewritePositionDeletes(table).options(options);
+          Result result = action.execute();
+
+          return toOutputRows(result);
+        });
+  }
+
+  private InternalRow[] toOutputRows(Result result) {
+    int rewrittenDataFilesCount = result.rewrittenDeleteFilesCount();
+    long rewrittenBytesCount = result.rewrittenBytesCount();
+    int addedDataFilesCount = result.addedDeleteFilesCount();

Review Comment:
   Thanks for the catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1194377561


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.SnapshotSummary.ADDED_FILE_SIZE_PROP;
+import static org.apache.iceberg.SnapshotSummary.REMOVED_FILE_SIZE_PROP;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Encoders;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRewritePositionDeleteFilesProcedure extends SparkExtensionsTestBase {
+
+  public TestRewritePositionDeleteFilesProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  private void createTable() throws Exception {
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+        tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "c"),
+            new SimpleRecord(4, "d"),
+            new SimpleRecord(5, "e"),
+            new SimpleRecord(6, "f"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testExpireDeleteFilesAll() throws Exception {
+    createTable();

Review Comment:
   Is it ok to keep manually for now?  I plan to add filter at some point and so will probably add a test for another kind of table (like partitioned table).  Usually in my experience it becomes awkward to support test of different kind of table, if the before is already creating one kind of table.



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] ajantha-bhat commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "ajantha-bhat (via GitHub)" <gi...@apache.org>.
ajantha-bhat commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1189274474


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter TABLE_PARAM =
+      ProcedureParameter.required("table", DataTypes.StringType);
+  private static final ProcedureParameter OPTIONS_PARAM =
+      ProcedureParameter.optional("options", STRING_MAP);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+
+  // counts are not nullable since the action result is never null
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField(
+                "rewritten_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField(
+                "added_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
+            new StructField("added_bytes_count", DataTypes.LongType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new Builder<RewritePositionDeleteFilesProcedure>() {
+      @Override
+      protected RewritePositionDeleteFilesProcedure doBuild() {
+        return new RewritePositionDeleteFilesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private RewritePositionDeleteFilesProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
+
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+
+    Map<String, String> options = input.asStringMap(OPTIONS_PARAM, ImmutableMap.of());
+
+    return modifyIcebergTable(
+        tableIdent,
+        table -> {
+          RewritePositionDeleteFiles action =
+              actions().rewritePositionDeletes(table).options(options);
+          Result result = action.execute();
+
+          return toOutputRows(result);
+        });
+  }
+
+  private InternalRow[] toOutputRows(Result result) {
+    int rewrittenDataFilesCount = result.rewrittenDeleteFilesCount();

Review Comment:
   ```suggestion
       int rewrittenDeleteFilesCount = result.rewrittenDeleteFilesCount();
   ```



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter TABLE_PARAM =
+      ProcedureParameter.required("table", DataTypes.StringType);
+  private static final ProcedureParameter OPTIONS_PARAM =
+      ProcedureParameter.optional("options", STRING_MAP);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+
+  // counts are not nullable since the action result is never null
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField(
+                "rewritten_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField(
+                "added_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
+            new StructField("added_bytes_count", DataTypes.LongType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new Builder<RewritePositionDeleteFilesProcedure>() {
+      @Override
+      protected RewritePositionDeleteFilesProcedure doBuild() {
+        return new RewritePositionDeleteFilesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private RewritePositionDeleteFilesProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
+
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+
+    Map<String, String> options = input.asStringMap(OPTIONS_PARAM, ImmutableMap.of());
+
+    return modifyIcebergTable(
+        tableIdent,
+        table -> {
+          RewritePositionDeleteFiles action =
+              actions().rewritePositionDeletes(table).options(options);
+          Result result = action.execute();
+
+          return toOutputRows(result);
+        });
+  }
+
+  private InternalRow[] toOutputRows(Result result) {
+    int rewrittenDataFilesCount = result.rewrittenDeleteFilesCount();
+    long rewrittenBytesCount = result.rewrittenBytesCount();
+    int addedDataFilesCount = result.addedDeleteFilesCount();

Review Comment:
   ```suggestion
       int addedDeleteFilesCount = result.addedDeleteFilesCount();
   ```



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java:
##########
@@ -54,6 +54,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
     mapBuilder.put("register_table", RegisterTableProcedure::builder);
     mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
     mapBuilder.put("create_changelog_view", CreateChangelogViewProcedure::builder);
+    mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder);

Review Comment:
   we can also add documentation here
   https://github.com/apache/iceberg/blob/master/docs/spark-procedures.md



##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.SnapshotSummary.ADDED_FILE_SIZE_PROP;
+import static org.apache.iceberg.SnapshotSummary.REMOVED_FILE_SIZE_PROP;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Encoders;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRewritePositionDeleteFilesProcedure extends SparkExtensionsTestBase {
+
+  public TestRewritePositionDeleteFilesProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  private void createTable() throws Exception {
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+        tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "c"),
+            new SimpleRecord(4, "d"),
+            new SimpleRecord(5, "e"),
+            new SimpleRecord(6, "f"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testExpireDeleteFilesAll() throws Exception {
+    createTable();
+
+    sql("DELETE FROM %s WHERE id=1", tableName);
+    sql("DELETE FROM %s WHERE id=2", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals(2, TestHelpers.deleteFiles(table).size());
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_position_delete_files("
+                + "table => '%s',"
+                + "options => map("
+                + "'rewrite-all','true'))",
+            catalogName, tableIdent);
+    table.refresh();
+
+    Map<String, String> snapshotSummary = snapshotSummary();
+    assertEquals(
+        "Should delete 2 delete files and add 1",
+        ImmutableList.of(
+            row(
+                2,
+                1,
+                Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
+                Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
+        output);
+
+    Assert.assertEquals(1, TestHelpers.deleteFiles(table).size());

Review Comment:
   nit: Should we also validate the contents of the new delete file? (whether it really has all the rewritten files' contents?)



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter TABLE_PARAM =
+      ProcedureParameter.required("table", DataTypes.StringType);
+  private static final ProcedureParameter OPTIONS_PARAM =
+      ProcedureParameter.optional("options", STRING_MAP);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+
+  // counts are not nullable since the action result is never null
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField(
+                "rewritten_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField(
+                "added_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
+            new StructField("added_bytes_count", DataTypes.LongType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new Builder<RewritePositionDeleteFilesProcedure>() {
+      @Override
+      protected RewritePositionDeleteFilesProcedure doBuild() {
+        return new RewritePositionDeleteFilesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private RewritePositionDeleteFilesProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
+
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+
+    Map<String, String> options = input.asStringMap(OPTIONS_PARAM, ImmutableMap.of());
+
+    return modifyIcebergTable(
+        tableIdent,
+        table -> {
+          RewritePositionDeleteFiles action =
+              actions().rewritePositionDeletes(table).options(options);
+          Result result = action.execute();
+
+          return toOutputRows(result);
+        });
+  }
+
+  private InternalRow[] toOutputRows(Result result) {
+    int rewrittenDataFilesCount = result.rewrittenDeleteFilesCount();

Review Comment:
   ```suggestion
       int rewrittenDeleteFilesCount = result.rewrittenDeleteFilesCount();
   ```



##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java:
##########
@@ -54,6 +54,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
     mapBuilder.put("register_table", RegisterTableProcedure::builder);
     mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
     mapBuilder.put("create_changelog_view", CreateChangelogViewProcedure::builder);
+    mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder);

Review Comment:
   nit: maybe shorten the name by just calling it as `rewrite_position_deletes`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1195785671


##########
spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/procedures/RewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.Result;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * A procedure that rewrites position delete files in a table.
+ *
+ * @see org.apache.iceberg.spark.actions.SparkActions#rewritePositionDeletes(Table)
+ */
+public class RewritePositionDeleteFilesProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter TABLE_PARAM =
+      ProcedureParameter.required("table", DataTypes.StringType);
+  private static final ProcedureParameter OPTIONS_PARAM =
+      ProcedureParameter.optional("options", STRING_MAP);
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {TABLE_PARAM, OPTIONS_PARAM};
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField(
+                "rewritten_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField(
+                "added_delete_files_count", DataTypes.IntegerType, false, Metadata.empty()),
+            new StructField("rewritten_bytes_count", DataTypes.LongType, false, Metadata.empty()),
+            new StructField("added_bytes_count", DataTypes.LongType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new Builder<RewritePositionDeleteFilesProcedure>() {
+      @Override
+      protected RewritePositionDeleteFilesProcedure doBuild() {
+        return new RewritePositionDeleteFilesProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private RewritePositionDeleteFilesProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
+    Identifier tableIdent = input.ident(TABLE_PARAM);
+    Map<String, String> options = input.asStringMap(OPTIONS_PARAM, ImmutableMap.of());
+
+    return modifyIcebergTable(
+        tableIdent,
+        table -> {
+          RewritePositionDeleteFiles action =

Review Comment:
   Just noticed I can get remove this as well and just chain the action call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1190491789


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.SnapshotSummary.ADDED_FILE_SIZE_PROP;
+import static org.apache.iceberg.SnapshotSummary.REMOVED_FILE_SIZE_PROP;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Encoders;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRewritePositionDeleteFilesProcedure extends SparkExtensionsTestBase {
+
+  public TestRewritePositionDeleteFilesProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  private void createTable() throws Exception {
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+        tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "c"),
+            new SimpleRecord(4, "d"),
+            new SimpleRecord(5, "e"),
+            new SimpleRecord(6, "f"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testExpireDeleteFilesAll() throws Exception {
+    createTable();
+
+    sql("DELETE FROM %s WHERE id=1", tableName);
+    sql("DELETE FROM %s WHERE id=2", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals(2, TestHelpers.deleteFiles(table).size());
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_position_delete_files("
+                + "table => '%s',"
+                + "options => map("
+                + "'rewrite-all','true'))",
+            catalogName, tableIdent);
+    table.refresh();
+
+    Map<String, String> snapshotSummary = snapshotSummary();
+    assertEquals(
+        "Should delete 2 delete files and add 1",
+        ImmutableList.of(
+            row(
+                2,
+                1,
+                Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
+                Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
+        output);
+
+    Assert.assertEquals(1, TestHelpers.deleteFiles(table).size());
+  }
+
+  @Test
+  public void testExpireDeleteFilesNoOption() throws Exception {
+    createTable();
+
+    sql("DELETE FROM %s WHERE id=1", tableName);
+    sql("DELETE FROM %s WHERE id=2", tableName);
+    sql("DELETE FROM %s WHERE id=3", tableName);
+    sql("DELETE FROM %s WHERE id=4", tableName);
+    sql("DELETE FROM %s WHERE id=5", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals(5, TestHelpers.deleteFiles(table).size());
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_position_delete_files(" + "table => '%s')",
+            catalogName, tableIdent);
+    table.refresh();
+
+    Map<String, String> snapshotSummary = snapshotSummary();
+    assertEquals(
+        "Should replace 5 delete files with 1",
+        ImmutableList.of(
+            row(
+                5,
+                1,
+                Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
+                Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
+        output);
+  }
+
+  @Test
+  public void testInvalidOption() throws Exception {
+    createTable();
+
+    Assert.assertThrows(
+        "Cannot use options [foo], they are not supported by the action or the rewriter BIN-PACK",
+        IllegalArgumentException.class,
+        () ->
+            sql(
+                "CALL %s.system.rewrite_position_delete_files("
+                    + "table => '%s',"
+                    + "options => map("
+                    + "'foo', 'bar'))",
+                catalogName, tableIdent));
+  }
+
+  private Map<String, String> snapshotSummary() {
+    return validationCatalog.loadTable(tableIdent).currentSnapshot().summary();
+  }
+}

Review Comment:
   Same, added already tests  of dangling deletes on : TestRewritePositionDeleteFilesAction, and was thinking this to be a quicker test just to validate the procedure code only



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org


[GitHub] [iceberg] szehon-ho commented on a diff in pull request #7572: Spark 3.4: Add RewritePositionDeleteFilesProcedure

Posted by "szehon-ho (via GitHub)" <gi...@apache.org>.
szehon-ho commented on code in PR #7572:
URL: https://github.com/apache/iceberg/pull/7572#discussion_r1190491402


##########
spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewritePositionDeleteFilesProcedure.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.SnapshotSummary.ADDED_FILE_SIZE_PROP;
+import static org.apache.iceberg.SnapshotSummary.REMOVED_FILE_SIZE_PROP;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Encoders;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRewritePositionDeleteFilesProcedure extends SparkExtensionsTestBase {
+
+  public TestRewritePositionDeleteFilesProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  private void createTable() throws Exception {
+    sql(
+        "CREATE TABLE %s (id bigint, data string) USING iceberg TBLPROPERTIES"
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+        tableName);
+
+    List<SimpleRecord> records =
+        Lists.newArrayList(
+            new SimpleRecord(1, "a"),
+            new SimpleRecord(2, "b"),
+            new SimpleRecord(3, "c"),
+            new SimpleRecord(4, "d"),
+            new SimpleRecord(5, "e"),
+            new SimpleRecord(6, "f"));
+    spark
+        .createDataset(records, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testExpireDeleteFilesAll() throws Exception {
+    createTable();
+
+    sql("DELETE FROM %s WHERE id=1", tableName);
+    sql("DELETE FROM %s WHERE id=2", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Assert.assertEquals(2, TestHelpers.deleteFiles(table).size());
+
+    List<Object[]> output =
+        sql(
+            "CALL %s.system.rewrite_position_delete_files("
+                + "table => '%s',"
+                + "options => map("
+                + "'rewrite-all','true'))",
+            catalogName, tableIdent);
+    table.refresh();
+
+    Map<String, String> snapshotSummary = snapshotSummary();
+    assertEquals(
+        "Should delete 2 delete files and add 1",
+        ImmutableList.of(
+            row(
+                2,
+                1,
+                Long.valueOf(snapshotSummary.get(REMOVED_FILE_SIZE_PROP)),
+                Long.valueOf(snapshotSummary.get(ADDED_FILE_SIZE_PROP)))),
+        output);
+
+    Assert.assertEquals(1, TestHelpers.deleteFiles(table).size());

Review Comment:
   I wanted this test to be about the procedure code, as I already have added coverage of the Action itself in TestRewritePositionDeleteFilesAction 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org