You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@iceberg.apache.org by GitBox <gi...@apache.org> on 2022/06/22 02:59:58 UTC

[GitHub] [iceberg] aokolnychyi commented on a diff in pull request #5094: Spark: Add Spark 3.2 copy on top of Spark 3.3

aokolnychyi commented on code in PR #5094:
URL: https://github.com/apache/iceberg/pull/5094#discussion_r903196822


##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.List;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.types.ArrayType$;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.BooleanType$;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.DecimalType$;
+import org.apache.spark.sql.types.DoubleType$;
+import org.apache.spark.sql.types.FloatType$;
+import org.apache.spark.sql.types.IntegerType$;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.MapType$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.MetadataBuilder;
+import org.apache.spark.sql.types.StringType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.sql.types.TimestampType$;
+
+class TypeToSparkType extends TypeUtil.SchemaVisitor<DataType> {
+  TypeToSparkType() {
+  }
+
+  @Override
+  public DataType schema(Schema schema, DataType structType) {
+    return structType;
+  }
+
+  @Override
+  public DataType struct(Types.StructType struct, List<DataType> fieldResults) {
+    List<Types.NestedField> fields = struct.fields();
+
+    List<StructField> sparkFields = Lists.newArrayListWithExpectedSize(fieldResults.size());
+    for (int i = 0; i < fields.size(); i += 1) {
+      Types.NestedField field = fields.get(i);
+      DataType type = fieldResults.get(i);
+      StructField sparkField = StructField.apply(
+          field.name(), type, field.isOptional(), fieldMetadata(field.fieldId()));
+      if (field.doc() != null) {
+        sparkField = sparkField.withComment(field.doc());
+      }
+      sparkFields.add(sparkField);
+    }
+
+    return StructType$.MODULE$.apply(sparkFields);
+  }
+
+  @Override
+  public DataType field(Types.NestedField field, DataType fieldResult) {
+    return fieldResult;
+  }
+
+  @Override
+  public DataType list(Types.ListType list, DataType elementResult) {
+    return ArrayType$.MODULE$.apply(elementResult, list.isElementOptional());
+  }
+
+  @Override
+  public DataType map(Types.MapType map, DataType keyResult, DataType valueResult) {
+    return MapType$.MODULE$.apply(keyResult, valueResult, map.isValueOptional());
+  }
+
+  @Override
+  public DataType primitive(Type.PrimitiveType primitive) {
+    switch (primitive.typeId()) {
+      case BOOLEAN:
+        return BooleanType$.MODULE$;
+      case INTEGER:
+        return IntegerType$.MODULE$;
+      case LONG:
+        return LongType$.MODULE$;
+      case FLOAT:
+        return FloatType$.MODULE$;
+      case DOUBLE:
+        return DoubleType$.MODULE$;
+      case DATE:
+        return DateType$.MODULE$;
+      case TIME:
+        throw new UnsupportedOperationException(
+            "Spark does not support time fields");
+      case TIMESTAMP:
+        return TimestampType$.MODULE$;
+      case STRING:
+        return StringType$.MODULE$;
+      case UUID:
+        // use String
+        return StringType$.MODULE$;
+      case FIXED:
+        return BinaryType$.MODULE$;
+      case BINARY:
+        return BinaryType$.MODULE$;
+      case DECIMAL:
+        Types.DecimalType decimal = (Types.DecimalType) primitive;
+        return DecimalType$.MODULE$.apply(decimal.precision(), decimal.scale());
+      default:
+        throw new UnsupportedOperationException(
+            "Cannot convert unknown type to Spark: " + primitive);
+    }
+  }
+
+  private Metadata fieldMetadata(int fieldId) {

Review Comment:
   Was this a necessary change?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.List;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.types.ArrayType$;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.BooleanType$;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.DecimalType$;
+import org.apache.spark.sql.types.DoubleType$;
+import org.apache.spark.sql.types.FloatType$;
+import org.apache.spark.sql.types.IntegerType$;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.MapType$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.MetadataBuilder;
+import org.apache.spark.sql.types.StringType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.sql.types.TimestampType$;
+
+class TypeToSparkType extends TypeUtil.SchemaVisitor<DataType> {
+  TypeToSparkType() {
+  }
+
+  @Override
+  public DataType schema(Schema schema, DataType structType) {
+    return structType;
+  }
+
+  @Override
+  public DataType struct(Types.StructType struct, List<DataType> fieldResults) {
+    List<Types.NestedField> fields = struct.fields();
+
+    List<StructField> sparkFields = Lists.newArrayListWithExpectedSize(fieldResults.size());
+    for (int i = 0; i < fields.size(); i += 1) {
+      Types.NestedField field = fields.get(i);
+      DataType type = fieldResults.get(i);
+      StructField sparkField = StructField.apply(
+          field.name(), type, field.isOptional(), fieldMetadata(field.fieldId()));

Review Comment:
   nit: Is there a way to stay on one line here? We can either pass `field` to `fieldMetadata` or define a temp var.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/TypeToSparkType.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.List;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.types.ArrayType$;
+import org.apache.spark.sql.types.BinaryType$;
+import org.apache.spark.sql.types.BooleanType$;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DateType$;
+import org.apache.spark.sql.types.DecimalType$;
+import org.apache.spark.sql.types.DoubleType$;
+import org.apache.spark.sql.types.FloatType$;
+import org.apache.spark.sql.types.IntegerType$;
+import org.apache.spark.sql.types.LongType$;
+import org.apache.spark.sql.types.MapType$;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.MetadataBuilder;
+import org.apache.spark.sql.types.StringType$;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType$;
+import org.apache.spark.sql.types.TimestampType$;
+
+class TypeToSparkType extends TypeUtil.SchemaVisitor<DataType> {
+  TypeToSparkType() {
+  }
+
+  @Override
+  public DataType schema(Schema schema, DataType structType) {
+    return structType;
+  }
+
+  @Override
+  public DataType struct(Types.StructType struct, List<DataType> fieldResults) {
+    List<Types.NestedField> fields = struct.fields();
+
+    List<StructField> sparkFields = Lists.newArrayListWithExpectedSize(fieldResults.size());
+    for (int i = 0; i < fields.size(); i += 1) {
+      Types.NestedField field = fields.get(i);
+      DataType type = fieldResults.get(i);
+      StructField sparkField = StructField.apply(
+          field.name(), type, field.isOptional(), fieldMetadata(field.fieldId()));
+      if (field.doc() != null) {
+        sparkField = sparkField.withComment(field.doc());
+      }
+      sparkFields.add(sparkField);
+    }
+
+    return StructType$.MODULE$.apply(sparkFields);
+  }
+
+  @Override
+  public DataType field(Types.NestedField field, DataType fieldResult) {
+    return fieldResult;
+  }
+
+  @Override
+  public DataType list(Types.ListType list, DataType elementResult) {
+    return ArrayType$.MODULE$.apply(elementResult, list.isElementOptional());
+  }
+
+  @Override
+  public DataType map(Types.MapType map, DataType keyResult, DataType valueResult) {
+    return MapType$.MODULE$.apply(keyResult, valueResult, map.isValueOptional());
+  }
+
+  @Override
+  public DataType primitive(Type.PrimitiveType primitive) {
+    switch (primitive.typeId()) {
+      case BOOLEAN:
+        return BooleanType$.MODULE$;
+      case INTEGER:
+        return IntegerType$.MODULE$;
+      case LONG:
+        return LongType$.MODULE$;
+      case FLOAT:
+        return FloatType$.MODULE$;
+      case DOUBLE:
+        return DoubleType$.MODULE$;
+      case DATE:
+        return DateType$.MODULE$;
+      case TIME:
+        throw new UnsupportedOperationException(
+            "Spark does not support time fields");
+      case TIMESTAMP:
+        return TimestampType$.MODULE$;
+      case STRING:
+        return StringType$.MODULE$;
+      case UUID:
+        // use String
+        return StringType$.MODULE$;
+      case FIXED:
+        return BinaryType$.MODULE$;
+      case BINARY:
+        return BinaryType$.MODULE$;
+      case DECIMAL:
+        Types.DecimalType decimal = (Types.DecimalType) primitive;
+        return DecimalType$.MODULE$.apply(decimal.precision(), decimal.scale());
+      default:
+        throw new UnsupportedOperationException(
+            "Cannot convert unknown type to Spark: " + primitive);
+    }
+  }
+
+  private Metadata fieldMetadata(int fieldId) {
+    Metadata metadata = Metadata.empty();

Review Comment:
   nit: what about using different return statements rather than re-assigning a var?
   
   ```
   private Metadata fieldMetadata(int fieldId) {
     if (MetadataColumns.metadataFieldIds().contains(fieldId)) {
       return new MetadataBuilder()
           .putBoolean(METADATA_COL_ATTR_KEY, true)
           .build();
     } else {
       return Metadata.empty();
     }
   }
   ```



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaOperation.java:
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.iceberg.write.DeltaWriteBuilder;
+import org.apache.spark.sql.connector.iceberg.write.ExtendedLogicalWriteInfo;
+import org.apache.spark.sql.connector.iceberg.write.SupportsDelta;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.RowLevelOperation;
+import org.apache.spark.sql.connector.write.RowLevelOperationInfo;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkPositionDeltaOperation implements RowLevelOperation, SupportsDelta {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final Command command;
+  private final IsolationLevel isolationLevel;
+
+  // lazy vars
+  private ScanBuilder lazyScanBuilder;
+  private Scan configuredScan;
+  private DeltaWriteBuilder lazyWriteBuilder;
+
+  SparkPositionDeltaOperation(SparkSession spark, Table table, RowLevelOperationInfo info,
+                              IsolationLevel isolationLevel) {
+    this.spark = spark;
+    this.table = table;
+    this.command = info.command();
+    this.isolationLevel = isolationLevel;
+  }
+
+  @Override
+  public Command command() {
+    return command;
+  }
+
+  @Override
+  public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+    if (lazyScanBuilder == null) {
+      this.lazyScanBuilder = new SparkScanBuilder(spark, table, options) {
+        @Override
+        public Scan build() {
+          Scan scan = super.buildMergeOnReadScan();
+          SparkPositionDeltaOperation.this.configuredScan = scan;
+          return scan;
+        }
+      };
+    }
+
+    return lazyScanBuilder;
+  }
+
+  @Override
+  public DeltaWriteBuilder newWriteBuilder(LogicalWriteInfo info) {
+    if (lazyWriteBuilder == null) {
+      Preconditions.checkArgument(info instanceof ExtendedLogicalWriteInfo,

Review Comment:
   What about using a shorter message to stay on one line?
   
   ```
   Preconditions.checkArgument(info instanceof ExtendedLogicalWriteInfo, "info must be ExtendedLogicalWriteInfo");
   ```



##########
spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/sql/TestDeleteFrom.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.sql;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkCatalogTestBase;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestDeleteFrom extends SparkCatalogTestBase {
+  public TestDeleteFrom(String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testDeleteFromUnpartitionedTable() throws NoSuchTableException {
+    sql("CREATE TABLE %s (id bigint, data string) USING iceberg", tableName);
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(2, "b"),
+        new SimpleRecord(3, "c")
+    );
+    Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+    df.coalesce(1).writeTo(tableName).append();
+
+    assertEquals("Should have expected rows",
+        ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+
+    sql("DELETE FROM %s WHERE id < 2", tableName);

Review Comment:
   Well, I am not sure this is correct as the built-in implementation is currently sub-optimal and does not have dynamic file filtering. Let me take a closer look.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java:
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.Map;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchFunctionException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.CatalogExtension;
+import org.apache.spark.sql.connector.catalog.CatalogPlugin;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * A Spark catalog that can also load non-Iceberg tables.
+ *
+ * @param <T> CatalogPlugin class to avoid casting to TableCatalog and SupportsNamespaces.
+ */
+public class SparkSessionCatalog<T extends TableCatalog & SupportsNamespaces>
+    extends BaseCatalog implements CatalogExtension {
+  private static final String[] DEFAULT_NAMESPACE = new String[] {"default"};
+
+  private String catalogName = null;
+  private TableCatalog icebergCatalog = null;
+  private StagingTableCatalog asStagingCatalog = null;
+  private T sessionCatalog = null;
+  private boolean createParquetAsIceberg = false;
+  private boolean createAvroAsIceberg = false;
+  private boolean createOrcAsIceberg = false;
+
+  /**
+   * Build a {@link SparkCatalog} to be used for Iceberg operations.
+   * <p>
+   * The default implementation creates a new SparkCatalog with the session catalog's name and options.
+   *
+   * @param name catalog name
+   * @param options catalog options
+   * @return a SparkCatalog to be used for Iceberg tables
+   */
+  protected TableCatalog buildSparkCatalog(String name, CaseInsensitiveStringMap options) {
+    SparkCatalog newCatalog = new SparkCatalog();
+    newCatalog.initialize(name, options);
+    return newCatalog;
+  }
+
+  @Override
+  public String[] defaultNamespace() {
+    return DEFAULT_NAMESPACE;
+  }
+
+  @Override
+  public String[][] listNamespaces() throws NoSuchNamespaceException {
+    return getSessionCatalog().listNamespaces();
+  }
+
+  @Override
+  public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException {
+    return getSessionCatalog().listNamespaces(namespace);
+  }
+
+  @Override
+  public boolean namespaceExists(String[] namespace) {
+    return getSessionCatalog().namespaceExists(namespace);
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException {
+    return getSessionCatalog().loadNamespaceMetadata(namespace);
+  }
+
+  @Override
+  public void createNamespace(String[] namespace, Map<String, String> metadata) throws NamespaceAlreadyExistsException {
+    getSessionCatalog().createNamespace(namespace, metadata);
+  }
+
+  @Override
+  public void alterNamespace(String[] namespace, NamespaceChange... changes) throws NoSuchNamespaceException {
+    getSessionCatalog().alterNamespace(namespace, changes);
+  }
+
+  @Override
+  public boolean dropNamespace(String[] namespace, boolean cascade)
+      throws NoSuchNamespaceException, NonEmptyNamespaceException {
+    return getSessionCatalog().dropNamespace(namespace, cascade);
+  }
+
+  @Override
+  public Identifier[] listTables(String[] namespace) throws NoSuchNamespaceException {
+    // delegate to the session catalog because all tables share the same namespace
+    return getSessionCatalog().listTables(namespace);
+  }
+
+  @Override
+  public Table loadTable(Identifier ident) throws NoSuchTableException {
+    try {
+      return icebergCatalog.loadTable(ident);
+    } catch (NoSuchTableException e) {
+      return getSessionCatalog().loadTable(ident);
+    }
+  }
+
+  @Override
+  public void invalidateTable(Identifier ident) {
+    // We do not need to check whether the table exists and whether
+    // it is an Iceberg table to reduce remote service requests.
+    icebergCatalog.invalidateTable(ident);
+    getSessionCatalog().invalidateTable(ident);
+  }
+
+  @Override
+  public Table createTable(Identifier ident, StructType schema, Transform[] partitions,
+                           Map<String, String> properties)
+      throws TableAlreadyExistsException, NoSuchNamespaceException {
+    String provider = properties.get("provider");
+    if (useIceberg(provider)) {
+      return icebergCatalog.createTable(ident, schema, partitions, properties);
+    } else {
+      // delegate to the session catalog
+      return getSessionCatalog().createTable(ident, schema, partitions, properties);
+    }
+  }
+
+  @Override
+  public StagedTable stageCreate(Identifier ident, StructType schema, Transform[] partitions,
+                                 Map<String, String> properties)
+      throws TableAlreadyExistsException, NoSuchNamespaceException {
+    String provider = properties.get("provider");
+    TableCatalog catalog;
+    if (useIceberg(provider)) {
+      if (asStagingCatalog != null) {
+        return asStagingCatalog.stageCreate(ident, schema, partitions, properties);
+      }
+      catalog = icebergCatalog;
+    } else {
+      catalog = getSessionCatalog();
+    }
+
+    // create the table with the session catalog, then wrap it in a staged table that will delete to roll back
+    Table table = catalog.createTable(ident, schema, partitions, properties);
+    return new RollbackStagedTable(catalog, ident, table);
+  }
+
+  @Override
+  public StagedTable stageReplace(Identifier ident, StructType schema, Transform[] partitions,
+                                  Map<String, String> properties)
+      throws NoSuchNamespaceException, NoSuchTableException {
+    String provider = properties.get("provider");
+    TableCatalog catalog;
+    if (useIceberg(provider)) {
+      if (asStagingCatalog != null) {
+        return asStagingCatalog.stageReplace(ident, schema, partitions, properties);
+      }
+      catalog = icebergCatalog;
+    } else {
+      catalog = getSessionCatalog();
+    }
+
+    // attempt to drop the table and fail if it doesn't exist
+    if (!catalog.dropTable(ident)) {
+      throw new NoSuchTableException(ident);
+    }
+
+    try {
+      // create the table with the session catalog, then wrap it in a staged table that will delete to roll back
+      Table table = catalog.createTable(ident, schema, partitions, properties);
+      return new RollbackStagedTable(catalog, ident, table);
+
+    } catch (TableAlreadyExistsException e) {
+      // the table was deleted, but now already exists again. retry the replace.
+      return stageReplace(ident, schema, partitions, properties);
+    }
+  }
+
+  @Override
+  public StagedTable stageCreateOrReplace(Identifier ident, StructType schema, Transform[] partitions,
+                                          Map<String, String> properties) throws NoSuchNamespaceException {
+    String provider = properties.get("provider");
+    TableCatalog catalog;
+    if (useIceberg(provider)) {
+      if (asStagingCatalog != null) {
+        return asStagingCatalog.stageCreateOrReplace(ident, schema, partitions, properties);
+      }
+      catalog = icebergCatalog;
+    } else {
+      catalog = getSessionCatalog();
+    }
+
+    // drop the table if it exists
+    catalog.dropTable(ident);
+
+    try {
+      // create the table with the session catalog, then wrap it in a staged table that will delete to roll back
+      Table sessionCatalogTable = catalog.createTable(ident, schema, partitions, properties);
+      return new RollbackStagedTable(catalog, ident, sessionCatalogTable);
+
+    } catch (TableAlreadyExistsException e) {
+      // the table was deleted, but now already exists again. retry the replace.
+      return stageCreateOrReplace(ident, schema, partitions, properties);
+    }
+  }
+
+  @Override
+  public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException {
+    if (icebergCatalog.tableExists(ident)) {
+      return icebergCatalog.alterTable(ident, changes);
+    } else {
+      return getSessionCatalog().alterTable(ident, changes);
+    }
+  }
+
+  @Override
+  public boolean dropTable(Identifier ident) {
+    // no need to check table existence to determine which catalog to use. if a table doesn't exist then both are
+    // required to return false.
+    return icebergCatalog.dropTable(ident) || getSessionCatalog().dropTable(ident);
+  }
+
+  @Override
+  public boolean purgeTable(Identifier ident) {
+    // no need to check table existence to determine which catalog to use. if a table doesn't exist then both are
+    // required to return false.
+    return icebergCatalog.purgeTable(ident) || getSessionCatalog().purgeTable(ident);
+  }
+
+  @Override
+  public void renameTable(Identifier from, Identifier to) throws NoSuchTableException, TableAlreadyExistsException {
+    // rename is not supported by HadoopCatalog. to avoid UnsupportedOperationException for session catalog tables,
+    // check table existence first to ensure that the table belongs to the Iceberg catalog.
+    if (icebergCatalog.tableExists(from)) {
+      icebergCatalog.renameTable(from, to);
+    } else {
+      getSessionCatalog().renameTable(from, to);
+    }
+  }
+
+  @Override
+  public final void initialize(String name, CaseInsensitiveStringMap options) {
+    this.catalogName = name;
+    this.icebergCatalog = buildSparkCatalog(name, options);
+    if (icebergCatalog instanceof StagingTableCatalog) {
+      this.asStagingCatalog = (StagingTableCatalog) icebergCatalog;
+    }
+
+    this.createParquetAsIceberg = options.getBoolean("parquet-enabled", createParquetAsIceberg);
+    this.createAvroAsIceberg = options.getBoolean("avro-enabled", createAvroAsIceberg);
+    this.createOrcAsIceberg = options.getBoolean("orc-enabled", createOrcAsIceberg);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setDelegateCatalog(CatalogPlugin sparkSessionCatalog) {
+    if (sparkSessionCatalog instanceof TableCatalog && sparkSessionCatalog instanceof SupportsNamespaces) {
+      this.sessionCatalog = (T) sparkSessionCatalog;
+    } else {
+      throw new IllegalArgumentException("Invalid session catalog: " + sparkSessionCatalog);
+    }
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  private boolean useIceberg(String provider) {
+    if (provider == null || "iceberg".equalsIgnoreCase(provider)) {
+      return true;
+    } else if (createParquetAsIceberg && "parquet".equalsIgnoreCase(provider)) {
+      return true;
+    } else if (createAvroAsIceberg && "avro".equalsIgnoreCase(provider)) {
+      return true;
+    } else if (createOrcAsIceberg && "orc".equalsIgnoreCase(provider)) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private T getSessionCatalog() {
+    Preconditions.checkNotNull(sessionCatalog, "Delegated SessionCatalog is missing. " +
+        "Please make sure your are replacing Spark's default catalog, named 'spark_catalog'.");
+    return sessionCatalog;
+  }
+
+  @Override
+  public Catalog icebergCatalog() {
+    Preconditions.checkArgument(icebergCatalog instanceof HasIcebergCatalog,
+        "Cannot return underlying Iceberg Catalog, wrapped catalog does not contain an Iceberg Catalog");
+    return ((HasIcebergCatalog) icebergCatalog).icebergCatalog();
+  }
+
+  @Override
+  public Identifier[] listFunctions(String[] namespace) {
+    return new Identifier[0];

Review Comment:
   @rdblue, should we delegate to the Spark catalog in new function-related methods?



##########
core/src/main/java/org/apache/iceberg/MetadataColumns.java:
##########
@@ -31,6 +31,8 @@ public class MetadataColumns {
   private MetadataColumns() {
   }
 
+  public static final String METADATA_COL_ATTR_KEY = "__metadata_col";

Review Comment:
   I don't think this belongs here. I think it is specific to Spark. What about defining a private constant in the utility that  converts an Iceberg schema to Spark? That's the only place where it is needed, right?



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java:
##########
@@ -0,0 +1,665 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Map;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SerializableTable;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.SnapshotUpdate;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.deletes.PositionDelete;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.BasePositionDeltaWriter;
+import org.apache.iceberg.io.ClusteredDataWriter;
+import org.apache.iceberg.io.ClusteredPositionDeleteWriter;
+import org.apache.iceberg.io.DataWriteResult;
+import org.apache.iceberg.io.DeleteWriteResult;
+import org.apache.iceberg.io.FanoutDataWriter;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.io.PartitioningWriter;
+import org.apache.iceberg.io.PositionDeltaWriter;
+import org.apache.iceberg.io.WriteResult;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.CommitMetadata;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkWriteConf;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.StructProjection;
+import org.apache.iceberg.util.Tasks;
+import org.apache.iceberg.util.ThreadPools;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.distributions.Distribution;
+import org.apache.spark.sql.connector.expressions.SortOrder;
+import org.apache.spark.sql.connector.iceberg.write.DeltaBatchWrite;
+import org.apache.spark.sql.connector.iceberg.write.DeltaWrite;
+import org.apache.spark.sql.connector.iceberg.write.DeltaWriter;
+import org.apache.spark.sql.connector.iceberg.write.DeltaWriterFactory;
+import org.apache.spark.sql.connector.iceberg.write.ExtendedLogicalWriteInfo;
+import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
+import org.apache.spark.sql.connector.write.RequiresDistributionAndOrdering;
+import org.apache.spark.sql.connector.write.RowLevelOperation.Command;
+import org.apache.spark.sql.connector.write.WriterCommitMessage;
+import org.apache.spark.sql.types.StructType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.IsolationLevel.SERIALIZABLE;
+
+class SparkPositionDeltaWrite implements DeltaWrite, RequiresDistributionAndOrdering {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SparkPositionDeltaWrite.class);
+
+  private final JavaSparkContext sparkContext;
+  private final Table table;
+  private final Command command;
+  private final SparkBatchQueryScan scan;
+  private final IsolationLevel isolationLevel;
+  private final Context context;
+  private final String applicationId;
+  private final boolean wapEnabled;
+  private final String wapId;
+  private final Map<String, String> extraSnapshotMetadata;
+  private final Distribution requiredDistribution;
+  private final SortOrder[] requiredOrdering;
+
+  private boolean cleanupOnAbort = true;
+
+  SparkPositionDeltaWrite(SparkSession spark, Table table, Command command, SparkBatchQueryScan scan,
+                          IsolationLevel isolationLevel, SparkWriteConf writeConf,
+                          ExtendedLogicalWriteInfo info, Schema dataSchema,
+                          Distribution requiredDistribution, SortOrder[] requiredOrdering) {
+    this.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    this.table = table;
+    this.command = command;
+    this.scan = scan;
+    this.isolationLevel = isolationLevel;
+    this.context = new Context(dataSchema, writeConf, info);
+    this.applicationId = spark.sparkContext().applicationId();
+    this.wapEnabled = writeConf.wapEnabled();
+    this.wapId = writeConf.wapId();
+    this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
+    this.requiredDistribution = requiredDistribution;
+    this.requiredOrdering = requiredOrdering;
+  }
+
+  @Override
+  public Distribution requiredDistribution() {
+    return requiredDistribution;
+  }
+
+  @Override
+  public SortOrder[] requiredOrdering() {
+    return requiredOrdering;
+  }
+
+  @Override
+  public DeltaBatchWrite toBatch() {
+    return new PositionDeltaBatchWrite();
+  }
+
+  private static <T extends ContentFile<T>> void cleanFiles(FileIO io, Iterable<T> files) {
+    Tasks.foreach(files)
+        .executeWith(ThreadPools.getWorkerPool())
+        .throwFailureWhenFinished()
+        .noRetry()
+        .run(file -> io.deleteFile(file.path().toString()));
+  }
+
+  private class PositionDeltaBatchWrite implements DeltaBatchWrite {
+
+    @Override
+    public DeltaWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
+      // broadcast the table metadata as the writer factory will be sent to executors
+      Broadcast<Table> tableBroadcast = sparkContext.broadcast(SerializableTable.copyOf(table));
+      return new PositionDeltaWriteFactory(tableBroadcast, command, context);
+    }
+
+    @Override
+    public void commit(WriterCommitMessage[] messages) {
+      RowDelta rowDelta = table.newRowDelta();
+
+      CharSequenceSet referencedDataFiles = CharSequenceSet.empty();
+
+      int addedDataFilesCount = 0;
+      int addedDeleteFilesCount = 0;
+
+      for (WriterCommitMessage message : messages) {
+        DeltaTaskCommit taskCommit = (DeltaTaskCommit) message;
+
+        for (DataFile dataFile : taskCommit.dataFiles()) {
+          rowDelta.addRows(dataFile);
+          addedDataFilesCount += 1;
+        }
+
+        for (DeleteFile deleteFile : taskCommit.deleteFiles()) {
+          rowDelta.addDeletes(deleteFile);
+          addedDeleteFilesCount += 1;
+        }
+
+        referencedDataFiles.addAll(Arrays.asList(taskCommit.referencedDataFiles()));
+      }
+
+      // the scan may be null if the optimizer replaces it with an empty relation (e.g. the cond is false)
+      // no validation is needed in this case as the command does not depend on the scanned table state
+      if (scan != null) {
+        Expression conflictDetectionFilter = conflictDetectionFilter(scan);
+        rowDelta.conflictDetectionFilter(conflictDetectionFilter);
+
+        rowDelta.validateDataFilesExist(referencedDataFiles);
+
+        if (scan.snapshotId() != null) {
+          // set the read snapshot ID to check only snapshots that happened after the table was read
+          // otherwise, the validation will go through all snapshots present in the table
+          rowDelta.validateFromSnapshot(scan.snapshotId());
+        }
+
+        if (command == Command.UPDATE || command == Command.MERGE) {

Review Comment:
   If we adapt `checkstyle.xml`, changes like this won't be needed in this class. 



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteOperation.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.IsolationLevel;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.read.Scan;
+import org.apache.spark.sql.connector.read.ScanBuilder;
+import org.apache.spark.sql.connector.write.LogicalWriteInfo;
+import org.apache.spark.sql.connector.write.RowLevelOperation;
+import org.apache.spark.sql.connector.write.RowLevelOperationInfo;
+import org.apache.spark.sql.connector.write.WriteBuilder;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+class SparkCopyOnWriteOperation implements RowLevelOperation {
+
+  private final SparkSession spark;
+  private final Table table;
+  private final Command command;
+  private final IsolationLevel isolationLevel;
+
+  // lazy vars
+  private ScanBuilder lazyScanBuilder;
+  private Scan configuredScan;
+  private WriteBuilder lazyWriteBuilder;
+
+  SparkCopyOnWriteOperation(SparkSession spark, Table table, RowLevelOperationInfo info,
+                            IsolationLevel isolationLevel) {
+    this.spark = spark;
+    this.table = table;
+    this.command = info.command();
+    this.isolationLevel = isolationLevel;
+  }
+
+  @Override
+  public Command command() {
+    return command;
+  }
+
+  @Override
+  public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
+    if (lazyScanBuilder == null) {
+      lazyScanBuilder = new SparkScanBuilder(spark, table, options) {
+        @Override
+        public Scan build() {
+          Scan scan = super.buildCopyOnWriteScan();
+          SparkCopyOnWriteOperation.this.configuredScan = scan;
+          return scan;
+        }
+      };
+    }
+
+    return lazyScanBuilder;
+  }
+
+  @Override
+  public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
+    if (lazyWriteBuilder == null) {
+      Preconditions.checkState(configuredScan != null, "Write must be configured after scan");
+      SparkWriteBuilder writeBuilder = new SparkWriteBuilder(spark, table, info);
+      lazyWriteBuilder = writeBuilder.overwriteFiles(configuredScan, command, isolationLevel);
+    }
+
+    return lazyWriteBuilder;
+  }
+
+  @Override
+  public NamedReference[] requiredMetadataAttributes() {
+    NamedReference file = Expressions.column(MetadataColumns.FILE_PATH.name());
+    NamedReference pos = Expressions.column(MetadataColumns.ROW_POSITION.name());
+
+    if (command == Command.DELETE || command == Command.UPDATE) {

Review Comment:
   Can we adapt `checkstyle.xml` to include the new class instead of qualifying? This should reduce the amount of changes.
   
   We have the following entry in `checkstyle.xml`:
   ```
   org.apache.spark.sql.connector.iceberg.write.RowLevelOperation.Command.*,
   ```
   
   I'd simply add the Spark class now.



##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkDistributionAndOrderingUtil.java:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.spark;
+
+import java.util.List;
+import org.apache.iceberg.DistributionMode;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ObjectArrays;
+import org.apache.iceberg.transforms.SortOrderVisitor;
+import org.apache.iceberg.util.SortOrderUtil;
+import org.apache.spark.sql.connector.distributions.ClusteredDistribution;
+import org.apache.spark.sql.connector.distributions.Distribution;
+import org.apache.spark.sql.connector.distributions.Distributions;
+import org.apache.spark.sql.connector.distributions.OrderedDistribution;
+import org.apache.spark.sql.connector.distributions.UnspecifiedDistribution;
+import org.apache.spark.sql.connector.expressions.Expression;
+import org.apache.spark.sql.connector.expressions.Expressions;
+import org.apache.spark.sql.connector.expressions.NamedReference;
+import org.apache.spark.sql.connector.expressions.SortDirection;
+import org.apache.spark.sql.connector.expressions.SortOrder;
+import org.apache.spark.sql.connector.write.RowLevelOperation.Command;
+
+public class SparkDistributionAndOrderingUtil {
+
+  private static final NamedReference SPEC_ID = Expressions.column(MetadataColumns.SPEC_ID.name());
+  private static final NamedReference PARTITION = Expressions.column(MetadataColumns.PARTITION_COLUMN_NAME);
+  private static final NamedReference FILE_PATH = Expressions.column(MetadataColumns.FILE_PATH.name());
+  private static final NamedReference ROW_POSITION = Expressions.column(MetadataColumns.ROW_POSITION.name());
+
+  private static final SortOrder SPEC_ID_ORDER = Expressions.sort(SPEC_ID, SortDirection.ASCENDING);
+  private static final SortOrder PARTITION_ORDER = Expressions.sort(PARTITION, SortDirection.ASCENDING);
+  private static final SortOrder FILE_PATH_ORDER = Expressions.sort(FILE_PATH, SortDirection.ASCENDING);
+  private static final SortOrder ROW_POSITION_ORDER = Expressions.sort(ROW_POSITION, SortDirection.ASCENDING);
+
+  private static final SortOrder[] EXISTING_FILE_ORDERING = new SortOrder[]{FILE_PATH_ORDER, ROW_POSITION_ORDER};
+  private static final SortOrder[] POSITION_DELETE_ORDERING = new SortOrder[]{
+      SPEC_ID_ORDER, PARTITION_ORDER, FILE_PATH_ORDER, ROW_POSITION_ORDER
+  };
+
+  private SparkDistributionAndOrderingUtil() {
+  }
+
+  public static Distribution buildRequiredDistribution(Table table, DistributionMode distributionMode) {
+    switch (distributionMode) {
+      case NONE:
+        return Distributions.unspecified();
+
+      case HASH:
+        return Distributions.clustered(Spark3Util.toTransforms(table.spec()));
+
+      case RANGE:
+        return Distributions.ordered(buildTableOrdering(table));
+
+      default:
+        throw new IllegalArgumentException("Unsupported distribution mode: " + distributionMode);
+    }
+  }
+
+  public static SortOrder[] buildRequiredOrdering(Table table, Distribution distribution) {
+    if (distribution instanceof OrderedDistribution) {
+      OrderedDistribution orderedDistribution = (OrderedDistribution) distribution;
+      return orderedDistribution.ordering();
+    } else {
+      return buildTableOrdering(table);
+    }
+  }
+
+  public static Distribution buildCopyOnWriteDistribution(Table table, Command command,
+                                                          DistributionMode distributionMode) {
+    if (command == Command.DELETE || command == Command.UPDATE) {
+      return buildCopyOnWriteDeleteUpdateDistribution(table, distributionMode);
+    } else {
+      return buildRequiredDistribution(table, distributionMode);
+    }
+  }
+
+  private static Distribution buildCopyOnWriteDeleteUpdateDistribution(Table table, DistributionMode distributionMode) {
+    switch (distributionMode) {
+      case NONE:
+        return Distributions.unspecified();
+
+      case HASH:
+        Expression[] clustering = new Expression[]{FILE_PATH};
+        return Distributions.clustered(clustering);
+
+      case RANGE:
+        SortOrder[] tableOrdering = buildTableOrdering(table);
+        if (table.sortOrder().isSorted()) {
+          return Distributions.ordered(tableOrdering);
+        } else {
+          SortOrder[] ordering = ObjectArrays.concat(tableOrdering, EXISTING_FILE_ORDERING, SortOrder.class);
+          return Distributions.ordered(ordering);
+        }
+
+      default:
+        throw new IllegalArgumentException("Unexpected distribution mode: " + distributionMode);
+    }
+  }
+
+  public static SortOrder[] buildCopyOnWriteOrdering(Table table, Command command, Distribution distribution) {
+    if (command == Command.DELETE || command == Command.UPDATE) {

Review Comment:
   nit: same in this class



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@iceberg.apache.org
For additional commands, e-mail: issues-help@iceberg.apache.org