You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by "aokolnychyi (via GitHub)" <gi...@apache.org> on 2023/03/02 06:50:39 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #6012: Spark 3.3: Add a procedure to generate table changes

aokolnychyi commented on code in PR #6012:
URL: https://github.com/apache/iceberg/pull/6012#discussion_r1122632629


##########
spark/v3.3/spark-extensions/src/main/antlr/org.apache.spark.sql.catalyst.parser.extensions/IcebergSqlExtensions.g4:
##########
@@ -173,6 +174,10 @@ stringMap
     : MAP '(' constant (',' constant)* ')'
     ;
 
+stringArray

Review Comment:
   Looks good.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.

Review Comment:
   Looks accurate now, thanks for updating!



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes

Review Comment:
   nit: What about `load insert and deletes from the changelog table`?



##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCreateChangelogViewProcedure extends SparkExtensionsTestBase {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+  private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name();
+  private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name();
+
+  public TestCreateChangelogViewProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Test
+  public void testCustomizedViewName() {
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    sql(
+        "CALL %s.system.create_changelog_view("
+            + "table => '%s',"
+            + "options => map('%s','%s','%s','%s'),"
+            + "changelog_view => '%s')",
+        catalogName,
+        tableName,
+        SparkReadOptions.START_SNAPSHOT_ID,
+        snap1.snapshotId(),
+        SparkReadOptions.END_SNAPSHOT_ID,
+        snap2.snapshotId(),
+        "cdc_view");
+
+    long rowCount = sql("select * from %s", "cdc_view").stream().count();
+    Assert.assertEquals(2, rowCount);
+  }
+
+  @Test
+  public void testNoSnapshotIdInput() {
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap0 = table.currentSnapshot();
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(" + "table => '%s')",
+            catalogName, tableName, "cdc_view");
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap0.snapshotId()),
+            row(2, "b", INSERT, 1, snap1.snapshotId()),
+            row(-2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", DELETE, 2, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id", viewName));
+  }
+
+  @Test
+  public void testTimestampsBasedQuery() {
+    long beginning = System.currentTimeMillis();
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap0 = table.currentSnapshot();
+    long afterFirstInsert = waitUntilAfter(snap0.timestampMillis());
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    long afterInsertOverwrite = waitUntilAfter(snap2.timestampMillis());
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', "
+                + "options => map('%s', '%s','%s', '%s'))",
+            catalogName,
+            tableName,
+            SparkReadOptions.START_TIMESTAMP,
+            beginning,
+            SparkReadOptions.END_TIMESTAMP,
+            afterInsertOverwrite);
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap0.snapshotId()),
+            row(2, "b", INSERT, 1, snap1.snapshotId()),
+            row(-2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", DELETE, 2, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
+
+    // query the timestamps starting from the second insert
+    returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', "
+                + "options => map('%s', '%s', '%s', '%s'))",
+            catalogName,
+            tableName,
+            SparkReadOptions.START_TIMESTAMP,
+            afterFirstInsert,
+            SparkReadOptions.END_TIMESTAMP,
+            afterInsertOverwrite);
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(-2, "b", INSERT, 1, snap2.snapshotId()),
+            row(2, "b", DELETE, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
+  }
+
+  @Test
+  public void testWithCarryovers() {
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap0 = table.currentSnapshot();
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "remove_carryovers => false,"
+                + "table => '%s')",
+            catalogName, tableName, "cdc_view");
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap0.snapshotId()),
+            row(2, "b", INSERT, 1, snap1.snapshotId()),
+            row(-2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", DELETE, 2, snap2.snapshotId()),
+            row(2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", INSERT, 2, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, _change_type", viewName));
+  }
+
+  @Test
+  public void testUpdate() {
+    sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
+    sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(3, "c", INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testUpdateWithIdentifierField() {
+    removeTables();
+    createTableWithIdentifierField();
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', compute_updates => true)",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(3, "c", INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testUpdateWithFilter() {
+    sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
+    sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId())),
+        // the predicate on partition columns will filter out the insert of (3, 'c') at the planning
+        // phase
+        sql("select * from %s where id != 3 order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testUpdateWithMultipleIdentifierColumns() {
+    removeTables();
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "identifier_columns => array('id','age'),"
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testRemoveCarryOvers() {
+    removeTables();
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    // carry-over row (2, 'e', 12)
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "identifier_columns => array('id','age'), "
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testRemoveCarryOversWithoutUpdatedRows() {
+    removeTables();
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    // carry-over row (2, 'e', 12)
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "compute_updates => false,"
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+
+    // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed, even
+    // though update-row is not computed
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
+            row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testNotRemoveCarryOvers() {
+    removeTables();
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    // carry-over row (2, 'e', 12)
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "compute_updates => false,"
+                + "remove_carryovers => false,"
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+
+    // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed, even
+    // though update-row is not computed
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
+            row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
+            row(2, "e", 12, DELETE, 1, snap2.snapshotId()),
+            row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data, _change_type", viewName));
+  }
+
+  @Before
+  public void setupTable() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, 1);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+  }
+
+  @After
+  public void removeTables() {

Review Comment:
   nit: We usually have these init methods at the top.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean removeCarryoverRows(InternalRow args) {
+    return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)
+        ? true
+        : args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+  }
+
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+    Column[] repartitionColumns =
+        Arrays.stream(df.columns())
+            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+            .map(df::col)
+            .toArray(Column[]::new);
+    return transform(df, repartitionColumns);
+  }
+
+  private String[] identifierColumns(InternalRow args) {
+    String[] identifierColumns = new String[0];
+
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      identifierColumns =
+          Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+              .map(column -> column.toString())
+              .toArray(String[]::new);
+    }
+
+    if (identifierColumns.length == 0) {
+      Identifier tableIdent =
+          toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+      Table table = loadSparkTable(tableIdent).table();
+      identifierColumns = table.schema().identifierFieldNames().toArray(new String[0]);
+    }
+
+    return identifierColumns;
+  }
+
+  private Dataset<Row> changelogRecords(String tableName, Map<String, String> readOptions) {
+    // no need to validate the read options here since the reader will validate them
+    return spark().read().options(readOptions).table(tableName);
+  }
+
+  private String changelogTableName(Identifier tableIdent) {

Review Comment:
   I'd make this return `Identifier` and accept identifiers in `loadTable`. That would better align with the existing code in procedures.
   
   ```
   Identifier changelogTableIdent = changelogTableIdent(tableIdent);
   Dataset<Row> df = loadTable(changelogTableIdent, options(args));
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean removeCarryoverRows(InternalRow args) {
+    return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)
+        ? true
+        : args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+  }
+
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+    Column[] repartitionColumns =
+        Arrays.stream(df.columns())
+            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+            .map(df::col)
+            .toArray(Column[]::new);
+    return transform(df, repartitionColumns);
+  }
+
+  private String[] identifierColumns(InternalRow args) {
+    String[] identifierColumns = new String[0];
+
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      identifierColumns =
+          Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+              .map(column -> column.toString())
+              .toArray(String[]::new);
+    }
+
+    if (identifierColumns.length == 0) {
+      Identifier tableIdent =
+          toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+      Table table = loadSparkTable(tableIdent).table();
+      identifierColumns = table.schema().identifierFieldNames().toArray(new String[0]);
+    }
+
+    return identifierColumns;
+  }
+
+  private Dataset<Row> changelogRecords(String tableName, Map<String, String> readOptions) {
+    // no need to validate the read options here since the reader will validate them
+    return spark().read().options(readOptions).table(tableName);
+  }
+
+  private String changelogTableName(Identifier tableIdent) {
+    List<String> namespace = Lists.newArrayList();
+    namespace.addAll(Arrays.asList(tableIdent.namespace()));
+    namespace.add(tableIdent.name());
+    Identifier changelogTableIdent =
+        Identifier.of(namespace.toArray(new String[0]), SparkChangelogTable.TABLE_NAME);
+    return Spark3Util.quotedFullIdentifier(tableCatalog().name(), changelogTableIdent);
+  }
+
+  private Map<String, String> readOptions(InternalRow args) {

Review Comment:
   nit: Just `options`?



##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCreateChangelogViewProcedure extends SparkExtensionsTestBase {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+  private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name();
+  private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name();
+
+  public TestCreateChangelogViewProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Test
+  public void testCustomizedViewName() {
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    sql(
+        "CALL %s.system.create_changelog_view("
+            + "table => '%s',"
+            + "options => map('%s','%s','%s','%s'),"
+            + "changelog_view => '%s')",
+        catalogName,
+        tableName,
+        SparkReadOptions.START_SNAPSHOT_ID,
+        snap1.snapshotId(),
+        SparkReadOptions.END_SNAPSHOT_ID,
+        snap2.snapshotId(),
+        "cdc_view");
+
+    long rowCount = sql("select * from %s", "cdc_view").stream().count();
+    Assert.assertEquals(2, rowCount);
+  }
+
+  @Test
+  public void testNoSnapshotIdInput() {
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap0 = table.currentSnapshot();
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(" + "table => '%s')",
+            catalogName, tableName, "cdc_view");
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap0.snapshotId()),
+            row(2, "b", INSERT, 1, snap1.snapshotId()),
+            row(-2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", DELETE, 2, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id", viewName));
+  }
+
+  @Test
+  public void testTimestampsBasedQuery() {
+    long beginning = System.currentTimeMillis();
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap0 = table.currentSnapshot();
+    long afterFirstInsert = waitUntilAfter(snap0.timestampMillis());
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    long afterInsertOverwrite = waitUntilAfter(snap2.timestampMillis());
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', "
+                + "options => map('%s', '%s','%s', '%s'))",
+            catalogName,
+            tableName,
+            SparkReadOptions.START_TIMESTAMP,
+            beginning,
+            SparkReadOptions.END_TIMESTAMP,
+            afterInsertOverwrite);
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap0.snapshotId()),
+            row(2, "b", INSERT, 1, snap1.snapshotId()),
+            row(-2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", DELETE, 2, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
+
+    // query the timestamps starting from the second insert
+    returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', "
+                + "options => map('%s', '%s', '%s', '%s'))",
+            catalogName,
+            tableName,
+            SparkReadOptions.START_TIMESTAMP,
+            afterFirstInsert,
+            SparkReadOptions.END_TIMESTAMP,
+            afterInsertOverwrite);
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(-2, "b", INSERT, 1, snap2.snapshotId()),
+            row(2, "b", DELETE, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
+  }
+
+  @Test
+  public void testWithCarryovers() {
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap0 = table.currentSnapshot();
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "remove_carryovers => false,"
+                + "table => '%s')",
+            catalogName, tableName, "cdc_view");
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap0.snapshotId()),
+            row(2, "b", INSERT, 1, snap1.snapshotId()),
+            row(-2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", DELETE, 2, snap2.snapshotId()),
+            row(2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", INSERT, 2, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, _change_type", viewName));
+  }
+
+  @Test
+  public void testUpdate() {
+    sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
+    sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(3, "c", INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testUpdateWithIdentifierField() {
+    removeTables();
+    createTableWithIdentifierField();
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', compute_updates => true)",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(3, "c", INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testUpdateWithFilter() {
+    sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
+    sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId())),
+        // the predicate on partition columns will filter out the insert of (3, 'c') at the planning
+        // phase
+        sql("select * from %s where id != 3 order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testUpdateWithMultipleIdentifierColumns() {
+    removeTables();
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "identifier_columns => array('id','age'),"
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testRemoveCarryOvers() {
+    removeTables();
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    // carry-over row (2, 'e', 12)
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "identifier_columns => array('id','age'), "
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testRemoveCarryOversWithoutUpdatedRows() {
+    removeTables();
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    // carry-over row (2, 'e', 12)
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "compute_updates => false,"
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+
+    // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed, even
+    // though update-row is not computed
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
+            row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testNotRemoveCarryOvers() {
+    removeTables();

Review Comment:
   Why do we explicitly call this if we have `After` method to clean up tables? Is it because we always create a default table first? If so, can we remove the `Before` init method and just call a correct create method in each test?



##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCreateChangelogViewProcedure extends SparkExtensionsTestBase {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+  private static final String UPDATE_BEFORE = ChangelogOperation.UPDATE_BEFORE.name();
+  private static final String UPDATE_AFTER = ChangelogOperation.UPDATE_AFTER.name();
+
+  public TestCreateChangelogViewProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @Test
+  public void testCustomizedViewName() {
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    sql(
+        "CALL %s.system.create_changelog_view("
+            + "table => '%s',"
+            + "options => map('%s','%s','%s','%s'),"
+            + "changelog_view => '%s')",
+        catalogName,
+        tableName,
+        SparkReadOptions.START_SNAPSHOT_ID,
+        snap1.snapshotId(),
+        SparkReadOptions.END_SNAPSHOT_ID,
+        snap2.snapshotId(),
+        "cdc_view");
+
+    long rowCount = sql("select * from %s", "cdc_view").stream().count();
+    Assert.assertEquals(2, rowCount);
+  }
+
+  @Test
+  public void testNoSnapshotIdInput() {
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap0 = table.currentSnapshot();
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(" + "table => '%s')",
+            catalogName, tableName, "cdc_view");
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap0.snapshotId()),
+            row(2, "b", INSERT, 1, snap1.snapshotId()),
+            row(-2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", DELETE, 2, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id", viewName));
+  }
+
+  @Test
+  public void testTimestampsBasedQuery() {
+    long beginning = System.currentTimeMillis();
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap0 = table.currentSnapshot();
+    long afterFirstInsert = waitUntilAfter(snap0.timestampMillis());
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    long afterInsertOverwrite = waitUntilAfter(snap2.timestampMillis());
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', "
+                + "options => map('%s', '%s','%s', '%s'))",
+            catalogName,
+            tableName,
+            SparkReadOptions.START_TIMESTAMP,
+            beginning,
+            SparkReadOptions.END_TIMESTAMP,
+            afterInsertOverwrite);
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap0.snapshotId()),
+            row(2, "b", INSERT, 1, snap1.snapshotId()),
+            row(-2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", DELETE, 2, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
+
+    // query the timestamps starting from the second insert
+    returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', "
+                + "options => map('%s', '%s', '%s', '%s'))",
+            catalogName,
+            tableName,
+            SparkReadOptions.START_TIMESTAMP,
+            afterFirstInsert,
+            SparkReadOptions.END_TIMESTAMP,
+            afterInsertOverwrite);
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(-2, "b", INSERT, 1, snap2.snapshotId()),
+            row(2, "b", DELETE, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id", returns.get(0)[0]));
+  }
+
+  @Test
+  public void testWithCarryovers() {
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap0 = table.currentSnapshot();
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "remove_carryovers => false,"
+                + "table => '%s')",
+            catalogName, tableName, "cdc_view");
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap0.snapshotId()),
+            row(2, "b", INSERT, 1, snap1.snapshotId()),
+            row(-2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", DELETE, 2, snap2.snapshotId()),
+            row(2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", INSERT, 2, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, _change_type", viewName));
+  }
+
+  @Test
+  public void testUpdate() {
+    sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
+    sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(3, "c", INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testUpdateWithIdentifierField() {
+    removeTables();
+    createTableWithIdentifierField();
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', compute_updates => true)",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(3, "c", INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testUpdateWithFilter() {
+    sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
+    sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', identifier_columns => array('id'))",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId())),
+        // the predicate on partition columns will filter out the insert of (3, 'c') at the planning
+        // phase
+        sql("select * from %s where id != 3 order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testUpdateWithMultipleIdentifierColumns() {
+    removeTables();
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "identifier_columns => array('id','age'),"
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testRemoveCarryOvers() {
+    removeTables();
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    // carry-over row (2, 'e', 12)
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "identifier_columns => array('id','age'), "
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testRemoveCarryOversWithoutUpdatedRows() {
+    removeTables();
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    // carry-over row (2, 'e', 12)
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "compute_updates => false,"
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+
+    // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed, even
+    // though update-row is not computed
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
+            row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testNotRemoveCarryOvers() {
+    removeTables();
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    // carry-over row (2, 'e', 12)
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "compute_updates => false,"
+                + "remove_carryovers => false,"
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+
+    // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 1) are removed, even

Review Comment:
   Is this comment accurate? I thought we were supposed to keep carryovers in this case.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean removeCarryoverRows(InternalRow args) {
+    return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)
+        ? true
+        : args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+  }
+
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+    Column[] repartitionColumns =
+        Arrays.stream(df.columns())
+            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+            .map(df::col)
+            .toArray(Column[]::new);
+    return transform(df, repartitionColumns);
+  }
+
+  private String[] identifierColumns(InternalRow args) {
+    String[] identifierColumns = new String[0];
+
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      identifierColumns =
+          Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+              .map(column -> column.toString())
+              .toArray(String[]::new);
+    }
+
+    if (identifierColumns.length == 0) {
+      Identifier tableIdent =
+          toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+      Table table = loadSparkTable(tableIdent).table();
+      identifierColumns = table.schema().identifierFieldNames().toArray(new String[0]);
+    }
+
+    return identifierColumns;
+  }
+
+  private Dataset<Row> changelogRecords(String tableName, Map<String, String> readOptions) {

Review Comment:
   This seems fairly generic. What about accepting `Identifier` and moving this to `BaseProcedure`?
   
   ```
   protected Dataset<Row> loadTable(Identifier tableIdent, Map<String, String> options) {
     String tableName = Spark3Util.quotedFullIdentifier(tableCatalog().name(), tableIdent);
     return spark().read().options(options).table(tableName);
   }
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {

Review Comment:
   nit: What about calling this `shouldComputeUpdateImages` and adding `computeUpdateImages`? I like what you did for carryovers.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean removeCarryoverRows(InternalRow args) {
+    return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)
+        ? true
+        : args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+  }
+
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+    Column[] repartitionColumns =
+        Arrays.stream(df.columns())
+            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+            .map(df::col)
+            .toArray(Column[]::new);
+    return transform(df, repartitionColumns);
+  }
+
+  private String[] identifierColumns(InternalRow args) {
+    String[] identifierColumns = new String[0];
+
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      identifierColumns =
+          Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+              .map(column -> column.toString())
+              .toArray(String[]::new);
+    }
+
+    if (identifierColumns.length == 0) {
+      Identifier tableIdent =
+          toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+      Table table = loadSparkTable(tableIdent).table();
+      identifierColumns = table.schema().identifierFieldNames().toArray(new String[0]);
+    }
+
+    return identifierColumns;
+  }
+
+  private Dataset<Row> changelogRecords(String tableName, Map<String, String> readOptions) {
+    // no need to validate the read options here since the reader will validate them
+    return spark().read().options(readOptions).table(tableName);
+  }
+
+  private String changelogTableName(Identifier tableIdent) {
+    List<String> namespace = Lists.newArrayList();
+    namespace.addAll(Arrays.asList(tableIdent.namespace()));
+    namespace.add(tableIdent.name());
+    Identifier changelogTableIdent =
+        Identifier.of(namespace.toArray(new String[0]), SparkChangelogTable.TABLE_NAME);
+    return Spark3Util.quotedFullIdentifier(tableCatalog().name(), changelogTableIdent);
+  }
+
+  private Map<String, String> readOptions(InternalRow args) {
+    Map<String, String> options = Maps.newHashMap();
+
+    if (!args.isNullAt(OPTIONS_ORDINAL)) {
+      args.getMap(OPTIONS_ORDINAL)
+          .foreach(
+              DataTypes.StringType,
+              DataTypes.StringType,
+              (k, v) -> {
+                options.put(k.toString(), v.toString());
+                return BoxedUnit.UNIT;
+              });
+    }
+
+    return options;
+  }
+
+  @NotNull

Review Comment:
   What are these annotations? I don't think we ever used them before.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {

Review Comment:
   nit: What about renaming this as `shouldRemoveCarryoverRows` to distinguish from the method that actually removes carryovers? Sorry I overlooked it earlier.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {

Review Comment:
   What about simplifying like this?
   
   ```
   // If the identifier columns are set, we compute pre/post update images by default.
   return !args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL);
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean removeCarryoverRows(InternalRow args) {
+    return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)

Review Comment:
   What about this? I like ternary operators but Spotless formats them in a weird way if they require multiple lines.
   ```
   return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL) || args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean removeCarryoverRows(InternalRow args) {
+    return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)
+        ? true
+        : args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+  }
+
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+    Column[] repartitionColumns =
+        Arrays.stream(df.columns())
+            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+            .map(df::col)
+            .toArray(Column[]::new);
+    return transform(df, repartitionColumns);
+  }
+
+  private String[] identifierColumns(InternalRow args) {
+    String[] identifierColumns = new String[0];
+
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      identifierColumns =
+          Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+              .map(column -> column.toString())
+              .toArray(String[]::new);
+    }
+
+    if (identifierColumns.length == 0) {
+      Identifier tableIdent =
+          toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+      Table table = loadSparkTable(tableIdent).table();
+      identifierColumns = table.schema().identifierFieldNames().toArray(new String[0]);
+    }
+
+    return identifierColumns;
+  }
+
+  private Dataset<Row> changelogRecords(String tableName, Map<String, String> readOptions) {
+    // no need to validate the read options here since the reader will validate them
+    return spark().read().options(readOptions).table(tableName);
+  }
+
+  private String changelogTableName(Identifier tableIdent) {
+    List<String> namespace = Lists.newArrayList();
+    namespace.addAll(Arrays.asList(tableIdent.namespace()));
+    namespace.add(tableIdent.name());
+    Identifier changelogTableIdent =
+        Identifier.of(namespace.toArray(new String[0]), SparkChangelogTable.TABLE_NAME);
+    return Spark3Util.quotedFullIdentifier(tableCatalog().name(), changelogTableIdent);
+  }
+
+  private Map<String, String> readOptions(InternalRow args) {
+    Map<String, String> options = Maps.newHashMap();
+
+    if (!args.isNullAt(OPTIONS_ORDINAL)) {
+      args.getMap(OPTIONS_ORDINAL)
+          .foreach(
+              DataTypes.StringType,
+              DataTypes.StringType,
+              (k, v) -> {
+                options.put(k.toString(), v.toString());
+                return BoxedUnit.UNIT;
+              });
+    }
+
+    return options;
+  }
+
+  @NotNull
+  private static String viewName(InternalRow args, String tableName) {
+    String viewName =
+        args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL)
+            ? null
+            : args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
+    if (viewName == null) {
+      String shortTableName =
+          tableName.contains(".") ? tableName.substring(tableName.lastIndexOf(".") + 1) : tableName;
+      viewName = shortTableName + "_changes";
+    }
+    return viewName;
+  }
+
+  private Dataset<Row> transform(Dataset<Row> df, Column[] repartitionColumns) {
+    Column[] sortSpec = sortSpec(df, repartitionColumns);
+    StructType schema = df.schema();
+    String[] identifierFields =
+        Arrays.stream(repartitionColumns).map(Column::toString).toArray(String[]::new);
+
+    return df.repartition(repartitionColumns)
+        .sortWithinPartitions(sortSpec)
+        .mapPartitions(
+            (MapPartitionsFunction<Row, Row>)
+                rowIterator -> ChangelogIterator.create(rowIterator, schema, identifierFields),
+            RowEncoder.apply(df.schema()));
+  }
+
+  @NotNull
+  private static Column[] getRepartitionExpr(Dataset<Row> df, String[] identifiers) {
+    Column[] repartitionSpec = new Column[identifiers.length + 1];
+    for (int i = 0; i < identifiers.length; i++) {
+      try {
+        repartitionSpec[i] = df.col(identifiers[i]);
+      } catch (Exception e) {

Review Comment:
   I am not sure there is a lot of value in intercepting exceptions in such cases. Spark would throw a fairly good exception and would list a set of available columns. Let's just use that. I feel this would simplify this block without impacting user experience.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean removeCarryoverRows(InternalRow args) {
+    return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)
+        ? true
+        : args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+  }
+
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+    Column[] repartitionColumns =
+        Arrays.stream(df.columns())
+            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+            .map(df::col)
+            .toArray(Column[]::new);
+    return transform(df, repartitionColumns);
+  }
+
+  private String[] identifierColumns(InternalRow args) {
+    String[] identifierColumns = new String[0];
+
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      identifierColumns =
+          Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+              .map(column -> column.toString())
+              .toArray(String[]::new);
+    }
+
+    if (identifierColumns.length == 0) {
+      Identifier tableIdent =
+          toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+      Table table = loadSparkTable(tableIdent).table();
+      identifierColumns = table.schema().identifierFieldNames().toArray(new String[0]);
+    }
+
+    return identifierColumns;
+  }
+
+  private Dataset<Row> changelogRecords(String tableName, Map<String, String> readOptions) {
+    // no need to validate the read options here since the reader will validate them
+    return spark().read().options(readOptions).table(tableName);
+  }
+
+  private String changelogTableName(Identifier tableIdent) {
+    List<String> namespace = Lists.newArrayList();
+    namespace.addAll(Arrays.asList(tableIdent.namespace()));
+    namespace.add(tableIdent.name());
+    Identifier changelogTableIdent =
+        Identifier.of(namespace.toArray(new String[0]), SparkChangelogTable.TABLE_NAME);
+    return Spark3Util.quotedFullIdentifier(tableCatalog().name(), changelogTableIdent);
+  }
+
+  private Map<String, String> readOptions(InternalRow args) {
+    Map<String, String> options = Maps.newHashMap();
+
+    if (!args.isNullAt(OPTIONS_ORDINAL)) {
+      args.getMap(OPTIONS_ORDINAL)
+          .foreach(
+              DataTypes.StringType,
+              DataTypes.StringType,
+              (k, v) -> {
+                options.put(k.toString(), v.toString());
+                return BoxedUnit.UNIT;
+              });
+    }
+
+    return options;
+  }
+
+  @NotNull
+  private static String viewName(InternalRow args, String tableName) {
+    String viewName =
+        args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL)
+            ? null
+            : args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
+    if (viewName == null) {
+      String shortTableName =
+          tableName.contains(".") ? tableName.substring(tableName.lastIndexOf(".") + 1) : tableName;
+      viewName = shortTableName + "_changes";
+    }
+    return viewName;
+  }
+
+  private Dataset<Row> transform(Dataset<Row> df, Column[] repartitionColumns) {
+    Column[] sortSpec = sortSpec(df, repartitionColumns);
+    StructType schema = df.schema();
+    String[] identifierFields =
+        Arrays.stream(repartitionColumns).map(Column::toString).toArray(String[]::new);
+
+    return df.repartition(repartitionColumns)
+        .sortWithinPartitions(sortSpec)
+        .mapPartitions(
+            (MapPartitionsFunction<Row, Row>)
+                rowIterator -> ChangelogIterator.create(rowIterator, schema, identifierFields),
+            RowEncoder.apply(df.schema()));
+  }
+
+  @NotNull

Review Comment:
   Same question about the annotation.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean removeCarryoverRows(InternalRow args) {
+    return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)
+        ? true
+        : args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+  }
+
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+    Column[] repartitionColumns =
+        Arrays.stream(df.columns())
+            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+            .map(df::col)
+            .toArray(Column[]::new);
+    return transform(df, repartitionColumns);
+  }
+
+  private String[] identifierColumns(InternalRow args) {
+    String[] identifierColumns = new String[0];
+
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      identifierColumns =
+          Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+              .map(column -> column.toString())
+              .toArray(String[]::new);
+    }
+
+    if (identifierColumns.length == 0) {
+      Identifier tableIdent =
+          toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+      Table table = loadSparkTable(tableIdent).table();
+      identifierColumns = table.schema().identifierFieldNames().toArray(new String[0]);
+    }
+
+    return identifierColumns;
+  }
+
+  private Dataset<Row> changelogRecords(String tableName, Map<String, String> readOptions) {
+    // no need to validate the read options here since the reader will validate them
+    return spark().read().options(readOptions).table(tableName);
+  }
+
+  private String changelogTableName(Identifier tableIdent) {
+    List<String> namespace = Lists.newArrayList();
+    namespace.addAll(Arrays.asList(tableIdent.namespace()));
+    namespace.add(tableIdent.name());
+    Identifier changelogTableIdent =
+        Identifier.of(namespace.toArray(new String[0]), SparkChangelogTable.TABLE_NAME);
+    return Spark3Util.quotedFullIdentifier(tableCatalog().name(), changelogTableIdent);
+  }
+
+  private Map<String, String> readOptions(InternalRow args) {
+    Map<String, String> options = Maps.newHashMap();
+
+    if (!args.isNullAt(OPTIONS_ORDINAL)) {
+      args.getMap(OPTIONS_ORDINAL)
+          .foreach(
+              DataTypes.StringType,
+              DataTypes.StringType,
+              (k, v) -> {
+                options.put(k.toString(), v.toString());
+                return BoxedUnit.UNIT;
+              });
+    }
+
+    return options;
+  }
+
+  @NotNull
+  private static String viewName(InternalRow args, String tableName) {
+    String viewName =
+        args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL)
+            ? null
+            : args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
+    if (viewName == null) {
+      String shortTableName =
+          tableName.contains(".") ? tableName.substring(tableName.lastIndexOf(".") + 1) : tableName;
+      viewName = shortTableName + "_changes";
+    }
+    return viewName;
+  }
+
+  private Dataset<Row> transform(Dataset<Row> df, Column[] repartitionColumns) {

Review Comment:
   What about calling it `applyChangelogIterator` or something?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean removeCarryoverRows(InternalRow args) {
+    return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)
+        ? true
+        : args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+  }
+
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+    Column[] repartitionColumns =
+        Arrays.stream(df.columns())
+            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+            .map(df::col)
+            .toArray(Column[]::new);
+    return transform(df, repartitionColumns);
+  }
+
+  private String[] identifierColumns(InternalRow args) {
+    String[] identifierColumns = new String[0];
+
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      identifierColumns =
+          Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+              .map(column -> column.toString())
+              .toArray(String[]::new);
+    }
+
+    if (identifierColumns.length == 0) {
+      Identifier tableIdent =
+          toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+      Table table = loadSparkTable(tableIdent).table();
+      identifierColumns = table.schema().identifierFieldNames().toArray(new String[0]);
+    }
+
+    return identifierColumns;
+  }
+
+  private Dataset<Row> changelogRecords(String tableName, Map<String, String> readOptions) {
+    // no need to validate the read options here since the reader will validate them
+    return spark().read().options(readOptions).table(tableName);
+  }
+
+  private String changelogTableName(Identifier tableIdent) {
+    List<String> namespace = Lists.newArrayList();
+    namespace.addAll(Arrays.asList(tableIdent.namespace()));
+    namespace.add(tableIdent.name());
+    Identifier changelogTableIdent =
+        Identifier.of(namespace.toArray(new String[0]), SparkChangelogTable.TABLE_NAME);
+    return Spark3Util.quotedFullIdentifier(tableCatalog().name(), changelogTableIdent);
+  }
+
+  private Map<String, String> readOptions(InternalRow args) {
+    Map<String, String> options = Maps.newHashMap();
+
+    if (!args.isNullAt(OPTIONS_ORDINAL)) {
+      args.getMap(OPTIONS_ORDINAL)
+          .foreach(
+              DataTypes.StringType,
+              DataTypes.StringType,
+              (k, v) -> {
+                options.put(k.toString(), v.toString());
+                return BoxedUnit.UNIT;
+              });
+    }
+
+    return options;
+  }
+
+  @NotNull
+  private static String viewName(InternalRow args, String tableName) {
+    String viewName =
+        args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL)
+            ? null
+            : args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
+    if (viewName == null) {
+      String shortTableName =
+          tableName.contains(".") ? tableName.substring(tableName.lastIndexOf(".") + 1) : tableName;
+      viewName = shortTableName + "_changes";
+    }
+    return viewName;
+  }
+
+  private Dataset<Row> transform(Dataset<Row> df, Column[] repartitionColumns) {
+    Column[] sortSpec = sortSpec(df, repartitionColumns);
+    StructType schema = df.schema();
+    String[] identifierFields =
+        Arrays.stream(repartitionColumns).map(Column::toString).toArray(String[]::new);
+
+    return df.repartition(repartitionColumns)
+        .sortWithinPartitions(sortSpec)
+        .mapPartitions(
+            (MapPartitionsFunction<Row, Row>)
+                rowIterator -> ChangelogIterator.create(rowIterator, schema, identifierFields),
+            RowEncoder.apply(df.schema()));
+  }
+
+  @NotNull
+  private static Column[] getRepartitionExpr(Dataset<Row> df, String[] identifiers) {

Review Comment:
   If we decide to add `computeUpdateImages`, I would put this logic there directly, like you did for carryovers.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean removeCarryoverRows(InternalRow args) {
+    return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)
+        ? true
+        : args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+  }
+
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+    Column[] repartitionColumns =
+        Arrays.stream(df.columns())
+            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+            .map(df::col)
+            .toArray(Column[]::new);
+    return transform(df, repartitionColumns);
+  }
+
+  private String[] identifierColumns(InternalRow args) {
+    String[] identifierColumns = new String[0];
+
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      identifierColumns =
+          Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+              .map(column -> column.toString())
+              .toArray(String[]::new);
+    }
+
+    if (identifierColumns.length == 0) {
+      Identifier tableIdent =
+          toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+      Table table = loadSparkTable(tableIdent).table();
+      identifierColumns = table.schema().identifierFieldNames().toArray(new String[0]);
+    }
+
+    return identifierColumns;
+  }
+
+  private Dataset<Row> changelogRecords(String tableName, Map<String, String> readOptions) {
+    // no need to validate the read options here since the reader will validate them
+    return spark().read().options(readOptions).table(tableName);
+  }
+
+  private String changelogTableName(Identifier tableIdent) {
+    List<String> namespace = Lists.newArrayList();
+    namespace.addAll(Arrays.asList(tableIdent.namespace()));
+    namespace.add(tableIdent.name());
+    Identifier changelogTableIdent =
+        Identifier.of(namespace.toArray(new String[0]), SparkChangelogTable.TABLE_NAME);
+    return Spark3Util.quotedFullIdentifier(tableCatalog().name(), changelogTableIdent);
+  }
+
+  private Map<String, String> readOptions(InternalRow args) {
+    Map<String, String> options = Maps.newHashMap();
+
+    if (!args.isNullAt(OPTIONS_ORDINAL)) {
+      args.getMap(OPTIONS_ORDINAL)
+          .foreach(
+              DataTypes.StringType,
+              DataTypes.StringType,
+              (k, v) -> {
+                options.put(k.toString(), v.toString());
+                return BoxedUnit.UNIT;
+              });
+    }
+
+    return options;
+  }
+
+  @NotNull
+  private static String viewName(InternalRow args, String tableName) {
+    String viewName =
+        args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL)
+            ? null
+            : args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
+    if (viewName == null) {
+      String shortTableName =
+          tableName.contains(".") ? tableName.substring(tableName.lastIndexOf(".") + 1) : tableName;

Review Comment:
   We already pass `ident.name()` to this method. Instead of checking for dots in the name, I think we can use the approach from the snippet above and escape the name using backticks.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean removeCarryoverRows(InternalRow args) {
+    return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)
+        ? true
+        : args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+  }
+
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+    Column[] repartitionColumns =
+        Arrays.stream(df.columns())
+            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+            .map(df::col)
+            .toArray(Column[]::new);
+    return transform(df, repartitionColumns);
+  }
+
+  private String[] identifierColumns(InternalRow args) {
+    String[] identifierColumns = new String[0];
+
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      identifierColumns =
+          Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+              .map(column -> column.toString())
+              .toArray(String[]::new);
+    }
+
+    if (identifierColumns.length == 0) {
+      Identifier tableIdent =
+          toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+      Table table = loadSparkTable(tableIdent).table();
+      identifierColumns = table.schema().identifierFieldNames().toArray(new String[0]);
+    }
+
+    return identifierColumns;
+  }
+
+  private Dataset<Row> changelogRecords(String tableName, Map<String, String> readOptions) {
+    // no need to validate the read options here since the reader will validate them
+    return spark().read().options(readOptions).table(tableName);
+  }
+
+  private String changelogTableName(Identifier tableIdent) {
+    List<String> namespace = Lists.newArrayList();
+    namespace.addAll(Arrays.asList(tableIdent.namespace()));
+    namespace.add(tableIdent.name());
+    Identifier changelogTableIdent =
+        Identifier.of(namespace.toArray(new String[0]), SparkChangelogTable.TABLE_NAME);
+    return Spark3Util.quotedFullIdentifier(tableCatalog().name(), changelogTableIdent);
+  }
+
+  private Map<String, String> readOptions(InternalRow args) {
+    Map<String, String> options = Maps.newHashMap();
+
+    if (!args.isNullAt(OPTIONS_ORDINAL)) {
+      args.getMap(OPTIONS_ORDINAL)
+          .foreach(
+              DataTypes.StringType,
+              DataTypes.StringType,
+              (k, v) -> {
+                options.put(k.toString(), v.toString());
+                return BoxedUnit.UNIT;
+              });
+    }
+
+    return options;
+  }
+
+  @NotNull
+  private static String viewName(InternalRow args, String tableName) {
+    String viewName =
+        args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL)
+            ? null
+            : args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
+    if (viewName == null) {

Review Comment:
   What about having if/else instead of an extra var and ternary operator above?
   
   ```
   if (args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL)) {
     return String.format("`%s_changes`", tableName);
   } else {
     return args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
   }
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean removeCarryoverRows(InternalRow args) {
+    return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)
+        ? true
+        : args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+  }
+
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+    Column[] repartitionColumns =
+        Arrays.stream(df.columns())
+            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+            .map(df::col)
+            .toArray(Column[]::new);
+    return transform(df, repartitionColumns);
+  }
+
+  private String[] identifierColumns(InternalRow args) {
+    String[] identifierColumns = new String[0];
+
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      identifierColumns =
+          Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+              .map(column -> column.toString())
+              .toArray(String[]::new);
+    }
+
+    if (identifierColumns.length == 0) {

Review Comment:
   I think it should be if/else. If someone provides empty identifier columns, we should complain.
   
   ```
   if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
     return ...;
   } else {
     return ...;
   }
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java:
##########
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.jetbrains.annotations.NotNull;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would require erasing this
+ * file and preserving row1 in a new file. The changelog table would report this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an insert row. Identifier
+ * columns are used for determining whether an insert and a delete record refer to the same row. If
+ * the two records share the same values for the identity columns they are considered to be before
+ * and after states of the same row. You can either set identifier fields in the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // Read data from the table.changes
+    Dataset<Row> df = changelogRecords(changelogTableName(tableIdent), readOptions(args));
+
+    if (computeUpdateImages(args)) {
+      String[] identifierColumns = identifierColumns(args);
+
+      Preconditions.checkArgument(
+          identifierColumns.length > 0,
+          "Cannot compute the update-rows because identifier columns are not set");
+
+      Column[] repartitionColumns = getRepartitionExpr(df, identifierColumns);
+      df = transform(df, repartitionColumns);
+    } else if (removeCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private boolean computeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    }
+
+    // If the identifier columns are set, we compute pre/post update images by default.
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean removeCarryoverRows(InternalRow args) {
+    return args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)
+        ? true
+        : args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+  }
+
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+    Column[] repartitionColumns =
+        Arrays.stream(df.columns())
+            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+            .map(df::col)
+            .toArray(Column[]::new);
+    return transform(df, repartitionColumns);
+  }
+
+  private String[] identifierColumns(InternalRow args) {
+    String[] identifierColumns = new String[0];
+
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      identifierColumns =
+          Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+              .map(column -> column.toString())
+              .toArray(String[]::new);
+    }
+
+    if (identifierColumns.length == 0) {
+      Identifier tableIdent =
+          toIdentifier(args.getString(TABLE_NAME_ORDINAL), PARAMETERS[TABLE_NAME_ORDINAL].name());
+      Table table = loadSparkTable(tableIdent).table();
+      identifierColumns = table.schema().identifierFieldNames().toArray(new String[0]);
+    }
+
+    return identifierColumns;
+  }
+
+  private Dataset<Row> changelogRecords(String tableName, Map<String, String> readOptions) {
+    // no need to validate the read options here since the reader will validate them
+    return spark().read().options(readOptions).table(tableName);
+  }
+
+  private String changelogTableName(Identifier tableIdent) {
+    List<String> namespace = Lists.newArrayList();
+    namespace.addAll(Arrays.asList(tableIdent.namespace()));
+    namespace.add(tableIdent.name());
+    Identifier changelogTableIdent =
+        Identifier.of(namespace.toArray(new String[0]), SparkChangelogTable.TABLE_NAME);
+    return Spark3Util.quotedFullIdentifier(tableCatalog().name(), changelogTableIdent);
+  }
+
+  private Map<String, String> readOptions(InternalRow args) {
+    Map<String, String> options = Maps.newHashMap();
+
+    if (!args.isNullAt(OPTIONS_ORDINAL)) {
+      args.getMap(OPTIONS_ORDINAL)
+          .foreach(
+              DataTypes.StringType,
+              DataTypes.StringType,
+              (k, v) -> {
+                options.put(k.toString(), v.toString());
+                return BoxedUnit.UNIT;
+              });
+    }
+
+    return options;
+  }
+
+  @NotNull
+  private static String viewName(InternalRow args, String tableName) {
+    String viewName =
+        args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL)
+            ? null
+            : args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
+    if (viewName == null) {
+      String shortTableName =
+          tableName.contains(".") ? tableName.substring(tableName.lastIndexOf(".") + 1) : tableName;
+      viewName = shortTableName + "_changes";
+    }
+    return viewName;
+  }
+
+  private Dataset<Row> transform(Dataset<Row> df, Column[] repartitionColumns) {
+    Column[] sortSpec = sortSpec(df, repartitionColumns);
+    StructType schema = df.schema();
+    String[] identifierFields =
+        Arrays.stream(repartitionColumns).map(Column::toString).toArray(String[]::new);
+
+    return df.repartition(repartitionColumns)
+        .sortWithinPartitions(sortSpec)
+        .mapPartitions(
+            (MapPartitionsFunction<Row, Row>)
+                rowIterator -> ChangelogIterator.create(rowIterator, schema, identifierFields),
+            RowEncoder.apply(df.schema()));

Review Comment:
   nit: Use `schema` var defined above?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org